实时 Notebook

您可以在实时会话中运行此 notebook Binder,或在 Github 上查看。

处理演变的工作流

对于某些工作流,我们一开始不知道计算的范围。我们需要进行一些计算,才能确定剩下的计算需要做什么。计算随着我们工作的增多而增长和演变。

例如,考虑一种情况,您需要读取许多文件,然后根据这些文件的内容,启动额外的工作。您希望并行读取这些文件,然后在每个文件内部暴露更多的并行性。

本示例介绍了使用 Dask Futures 处理这种情况的三种方法

  1. 使用 as_completed

  2. 使用 async/await

  3. 从任务中启动任务

但首先,让我们顺序运行代码。

0:顺序代码

[1]:
filenames = ["file.{}.txt".format(i) for i in range(10)]

filenames[:3]
[1]:
['file.0.txt', 'file.1.txt', 'file.2.txt']
[2]:
import random, time


def parse_file(fn: str) -> list:
    """ Returns a list work items of unknown length """
    time.sleep(random.random())
    return [random.random() for _ in range(random.randint(1, 10))]

def process_item(x: float):
    """ Process each work item """
    time.sleep(random.random() / 4)
    return x + 1
[3]:
%%time

# This takes around 10-20s

results = []

for fn in filenames:
    L = parse_file(fn)
    for x in L:
        out = process_item(x)
        results.append(out)
CPU times: user 8.63 ms, sys: 1 ms, total: 9.63 ms
Wall time: 13.4 s

启动 Dask 客户端

我们需要一个 Dask 客户端来管理动态工作负载

[4]:
from dask.distributed import Client

client = Client(processes=False, n_workers=1, threads_per_worker=6)
client
[4]:

客户端

Client-8cd18990-0de0-11ed-9f5a-000d3a8f7959

连接方法: 集群对象 集群类型: distributed.LocalCluster
仪表盘: http://10.1.1.64:8787/status

集群信息

1:使用 as_completed

as_completed 迭代器允许我们在 futures 完成时对其进行处理。然后我们可以即时提交更多数据。

  • 我们为每个文件名提交一个任务

  • 我们还计算每个返回列表的长度

  • 当这些长度返回时,我们提交一个新任务来获取该列表的每个项目。我们以更高的优先级执行此操作,以便在收集新数据之前处理现有数据。

  • 我们等待所有返回的结果

[5]:
%%time

from dask.distributed import as_completed
import operator

lists = client.map(parse_file, filenames, pure=False)
lengths = client.map(len, lists)

mapping = dict(zip(lengths, lists))

futures = []

for future in as_completed(lengths):
    n = future.result()
    L = mapping[future]
    for i in range(n):
        new = client.submit(operator.getitem, L, i, priority=1)
        new = client.submit(process_item, new, priority=1)
        futures.append(new)

client.gather(futures)
CPU times: user 764 ms, sys: 92.2 ms, total: 856 ms
Wall time: 2.6 s
[5]:
[1.855253279843388,
 1.7726329902181985,
 1.259836642564709,
 1.3256571039929872,
 1.5880998029929763,
 1.5138789437567746,
 1.5792286074896462,
 1.3147689951092234,
 1.3172076038757292,
 1.262171473735921,
 1.4714750086893678,
 1.201396011598346,
 1.0228797089919106,
 1.100749927496291,
 1.312524856134222,
 1.6853261123541268,
 1.7828341588461032,
 1.353256049336657,
 1.5560048785929426,
 1.909180985889757,
 1.3652870032952238,
 1.0200787324818752,
 1.6887029263541482,
 1.8419102255817514,
 1.7088922634312127,
 1.0272413357700962,
 1.9966520355684192,
 1.7515633492540368,
 1.1784571947389346,
 1.2626529965584385,
 1.776210436365392,
 1.1980741841717615,
 1.6535182981420282,
 1.652425843466223,
 1.8679564994356905,
 1.4829294673653362,
 1.5798547500541158,
 1.140733801463906,
 1.8522280296306772]

2:使用 async/await 在本地处理单个文件

我们也可以在本地进程内处理此处的并发性。这需要您理解 async/await 语法,但这通常很强大,而且可以说比上面 as_completed 的方法更简单。

[6]:
import asyncio

async def f(fn):
    """ Handle the lifecycle of a single file """
    future = client.submit(parse_file, fn, pure=False)
    length_future = client.submit(len, future)
    length = await length_future

    futures = [client.submit(operator.getitem, future, i, priority=10)
               for i in range(length)]
    futures = client.map(process_item, futures, priority=10)
    return futures

async def run_all(filenames):
    list_of_list_of_futures = await asyncio.gather(*[f(fn) for fn in filenames])
    futures = sum(list_of_list_of_futures, [])
    return await client.gather(futures)

我们现在需要在与客户端运行相同的事件循环中运行此函数。如果我们以异步方式启动了客户端,那么我们就可以这样做

client = await Client(asynchronous=True)

await run_all(filenames)

然而,由于我们在没有使用 asynchronous=True 标志的情况下启动了客户端,事件循环实际上正在一个单独的线程中运行,所以我们必须请客户端为我们运行此操作。

[7]:
client.sync(run_all, filenames)
[7]:
[1.800289116905064,
 1.9871081844250762,
 1.4822374404826442,
 1.5398817452739677,
 1.7815421767702233,
 1.799086158704496,
 1.3684979870560656,
 1.7599086452147468,
 1.481710657277671,
 1.498441345100709,
 1.4189325403582265,
 1.4919506969852225,
 1.784110902042696,
 1.300637697987958,
 1.67384020477976,
 1.9299992134781654,
 1.5169156883991846,
 1.3547854189286748,
 1.0158519081255195,
 1.2905126990047529,
 1.5902047470140799,
 1.3370247692084685,
 1.8606232787906918,
 1.0593086235586204,
 1.2708399290512835,
 1.6014307072413518,
 1.719953843514872,
 1.811378106188461,
 1.196149275085125,
 1.1182782176944635,
 1.233133146375533,
 1.9828811027418132,
 1.0789456550316718,
 1.8491111490176726,
 1.895451013583132,
 1.3219157394238925,
 1.7080716916510053,
 1.3280857033684856,
 1.0402444828283657,
 1.6131248314712203,
 1.2100254915413697,
 1.092415131966796,
 1.93960521763009,
 1.793696097887303,
 1.6268308430745564,
 1.1389083858458733,
 1.6461270076543006,
 1.087262577270262,
 1.5348789887982262,
 1.7953259319984842]

3:从任务中提交任务

我们还可以提交会自行提交更多任务的任务。请参阅此处的文档

[8]:
%%time

from dask.distributed import get_client, secede, rejoin

def f(fn):
    L = parse_file(fn)
    client = get_client()

    futures = client.map(process_item, L, priority=10)
    secede()
    results = client.gather(futures)
    rejoin()
    return results

futures = client.map(f, filenames, pure=False)
results = client.gather(futures)
CPU times: user 357 ms, sys: 20.7 ms, total: 378 ms
Wall time: 2.14 s