处理演变的工作流
目录
实时 Notebook
您可以在实时会话中运行此 notebook ,或在 Github 上查看。
处理演变的工作流¶
对于某些工作流,我们一开始不知道计算的范围。我们需要进行一些计算,才能确定剩下的计算需要做什么。计算随着我们工作的增多而增长和演变。
例如,考虑一种情况,您需要读取许多文件,然后根据这些文件的内容,启动额外的工作。您希望并行读取这些文件,然后在每个文件内部暴露更多的并行性。
本示例介绍了使用 Dask Futures 处理这种情况的三种方法
使用
as_completed
使用
async/await
从任务中启动任务
但首先,让我们顺序运行代码。
0:顺序代码¶
[1]:
filenames = ["file.{}.txt".format(i) for i in range(10)]
filenames[:3]
[1]:
['file.0.txt', 'file.1.txt', 'file.2.txt']
[2]:
import random, time
def parse_file(fn: str) -> list:
""" Returns a list work items of unknown length """
time.sleep(random.random())
return [random.random() for _ in range(random.randint(1, 10))]
def process_item(x: float):
""" Process each work item """
time.sleep(random.random() / 4)
return x + 1
[3]:
%%time
# This takes around 10-20s
results = []
for fn in filenames:
L = parse_file(fn)
for x in L:
out = process_item(x)
results.append(out)
CPU times: user 8.63 ms, sys: 1 ms, total: 9.63 ms
Wall time: 13.4 s
启动 Dask 客户端¶
我们需要一个 Dask 客户端来管理动态工作负载
[4]:
from dask.distributed import Client
client = Client(processes=False, n_workers=1, threads_per_worker=6)
client
[4]:
客户端
Client-8cd18990-0de0-11ed-9f5a-000d3a8f7959
连接方法: 集群对象 | 集群类型: distributed.LocalCluster |
仪表盘: http://10.1.1.64:8787/status |
集群信息
LocalCluster
136395a1
仪表盘: http://10.1.1.64:8787/status | 工作节点 1 |
总线程数 6 | 总内存: 6.78 GiB |
状态: 运行中 | 使用进程: False |
调度器信息
调度器
Scheduler-32e093ca-1ad3-4cda-a0bf-b68890befde7
通信: inproc://10.1.1.64/8026/1 | 工作节点 1 |
仪表盘: http://10.1.1.64:8787/status | 总线程数 6 |
启动时间: 刚刚 | 总内存: 6.78 GiB |
工作节点
工作节点:0
通信: inproc://10.1.1.64/8026/4 | 总线程数 6 |
仪表盘: http://10.1.1.64:39259/status | 内存: 6.78 GiB |
Nanny: None | |
本地目录: /home/runner/work/dask-examples/dask-examples/applications/dask-worker-space/worker-v95r5hsj |
1:使用 as_completed¶
as_completed 迭代器允许我们在 futures 完成时对其进行处理。然后我们可以即时提交更多数据。
我们为每个文件名提交一个任务
我们还计算每个返回列表的长度
当这些长度返回时,我们提交一个新任务来获取该列表的每个项目。我们以更高的优先级执行此操作,以便在收集新数据之前处理现有数据。
我们等待所有返回的结果
[5]:
%%time
from dask.distributed import as_completed
import operator
lists = client.map(parse_file, filenames, pure=False)
lengths = client.map(len, lists)
mapping = dict(zip(lengths, lists))
futures = []
for future in as_completed(lengths):
n = future.result()
L = mapping[future]
for i in range(n):
new = client.submit(operator.getitem, L, i, priority=1)
new = client.submit(process_item, new, priority=1)
futures.append(new)
client.gather(futures)
CPU times: user 764 ms, sys: 92.2 ms, total: 856 ms
Wall time: 2.6 s
[5]:
[1.855253279843388,
1.7726329902181985,
1.259836642564709,
1.3256571039929872,
1.5880998029929763,
1.5138789437567746,
1.5792286074896462,
1.3147689951092234,
1.3172076038757292,
1.262171473735921,
1.4714750086893678,
1.201396011598346,
1.0228797089919106,
1.100749927496291,
1.312524856134222,
1.6853261123541268,
1.7828341588461032,
1.353256049336657,
1.5560048785929426,
1.909180985889757,
1.3652870032952238,
1.0200787324818752,
1.6887029263541482,
1.8419102255817514,
1.7088922634312127,
1.0272413357700962,
1.9966520355684192,
1.7515633492540368,
1.1784571947389346,
1.2626529965584385,
1.776210436365392,
1.1980741841717615,
1.6535182981420282,
1.652425843466223,
1.8679564994356905,
1.4829294673653362,
1.5798547500541158,
1.140733801463906,
1.8522280296306772]
2:使用 async/await 在本地处理单个文件¶
我们也可以在本地进程内处理此处的并发性。这需要您理解 async/await 语法,但这通常很强大,而且可以说比上面 as_completed
的方法更简单。
[6]:
import asyncio
async def f(fn):
""" Handle the lifecycle of a single file """
future = client.submit(parse_file, fn, pure=False)
length_future = client.submit(len, future)
length = await length_future
futures = [client.submit(operator.getitem, future, i, priority=10)
for i in range(length)]
futures = client.map(process_item, futures, priority=10)
return futures
async def run_all(filenames):
list_of_list_of_futures = await asyncio.gather(*[f(fn) for fn in filenames])
futures = sum(list_of_list_of_futures, [])
return await client.gather(futures)
我们现在需要在与客户端运行相同的事件循环中运行此函数。如果我们以异步方式启动了客户端,那么我们就可以这样做
client = await Client(asynchronous=True)
await run_all(filenames)
然而,由于我们在没有使用 asynchronous=True
标志的情况下启动了客户端,事件循环实际上正在一个单独的线程中运行,所以我们必须请客户端为我们运行此操作。
[7]:
client.sync(run_all, filenames)
[7]:
[1.800289116905064,
1.9871081844250762,
1.4822374404826442,
1.5398817452739677,
1.7815421767702233,
1.799086158704496,
1.3684979870560656,
1.7599086452147468,
1.481710657277671,
1.498441345100709,
1.4189325403582265,
1.4919506969852225,
1.784110902042696,
1.300637697987958,
1.67384020477976,
1.9299992134781654,
1.5169156883991846,
1.3547854189286748,
1.0158519081255195,
1.2905126990047529,
1.5902047470140799,
1.3370247692084685,
1.8606232787906918,
1.0593086235586204,
1.2708399290512835,
1.6014307072413518,
1.719953843514872,
1.811378106188461,
1.196149275085125,
1.1182782176944635,
1.233133146375533,
1.9828811027418132,
1.0789456550316718,
1.8491111490176726,
1.895451013583132,
1.3219157394238925,
1.7080716916510053,
1.3280857033684856,
1.0402444828283657,
1.6131248314712203,
1.2100254915413697,
1.092415131966796,
1.93960521763009,
1.793696097887303,
1.6268308430745564,
1.1389083858458733,
1.6461270076543006,
1.087262577270262,
1.5348789887982262,
1.7953259319984842]
3:从任务中提交任务¶
我们还可以提交会自行提交更多任务的任务。请参阅此处的文档。
[8]:
%%time
from dask.distributed import get_client, secede, rejoin
def f(fn):
L = parse_file(fn)
client = get_client()
futures = client.map(process_item, L, priority=10)
secede()
results = client.gather(futures)
rejoin()
return results
futures = client.map(f, filenames, pure=False)
results = client.gather(futures)
CPU times: user 357 ms, sys: 20.7 ms, total: 378 ms
Wall time: 2.14 s