易并行工作负载
目录
实时 Notebook
您可以在实时会话中运行此 Notebook 或在 Github 上查看。
易并行工作负载¶
本 Notebook 展示了如何使用 Dask 并行化易并行工作负载,即您希望将一个函数独立应用于许多数据片段的情况。它将展示使用 Dask 实现此目的的三种不同方法:
本示例侧重于使用 Dask 构建大型易并行计算,这常见于科学界和高性能计算设施中,例如蒙特卡洛方法。这类模拟假设以下几点:
我们有一个给定一些参数即可执行繁重计算的函数。
我们需要在许多不同的输入参数上计算此函数,且每个函数调用都是独立的。
我们希望将所有结果收集到一处以便进一步分析。
启动 Dask Client 查看仪表盘¶
启动 Dask Client 将提供一个仪表盘,有助于深入了解计算过程。本示例使用 Futures API 的部分也将需要它。此外,由于这类计算通常在超级计算机或云环境中启动,您最终可能需要启动一个集群并连接一个客户端以实现扩展。请参阅 dask-jobqueue、dask-kubernetes 或 dask-yarn,它们分别提供了在 HPC、云或大数据基础设施上轻松实现此目的的方法。
仪表盘的链接将在您创建下面的客户端时显示。我们建议您在一侧屏幕打开仪表盘,同时在另一侧使用 Notebook。调整窗口位置可能需要一些努力,但在学习时同时查看两者非常有用。
[1]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client
[1]:
Client
Client-6c01af35-0de0-11ed-9e28-000d3a8f7959
连接方法: 集群对象 | 集群类型: distributed.LocalCluster |
仪表盘: http://127.0.0.1:8787/status |
集群信息
LocalCluster
6c9279a2
仪表盘: http://127.0.0.1:8787/status | Workers 1 |
总线程数 4 | 总内存: 6.78 GiB |
状态: 运行中 | 使用进程: 是 |
调度器信息
Scheduler
Scheduler-859f8895-4dc5-45e4-945c-593b2390b28a
通讯: tcp://127.0.0.1:46295 | Workers 1 |
仪表盘: http://127.0.0.1:8787/status | 总线程数 4 |
启动时间: 刚刚 | 总内存: 6.78 GiB |
Workers
Worker: 0
通讯: tcp://127.0.0.1:33017 | 总线程数 4 |
仪表盘: http://127.0.0.1:45931/status | 内存: 6.78 GiB |
Nanny: tcp://127.0.0.1:46881 | |
本地目录: /home/runner/work/dask-examples/dask-examples/applications/dask-worker-space/worker-ii4pk6hj |
定义计算调用函数¶
此函数执行一个简单的操作:将列表/数组中的所有数字相加,但它也会随机休眠一段时间以模拟实际工作。在实际用例中,这可以调用另一个 python 模块,甚至可以使用 subprocess 模块运行可执行文件。
[2]:
import time
import random
def costly_simulation(list_param):
time.sleep(random.random())
return sum(list_param)
我们在下方本地尝试一下
[3]:
%time costly_simulation([1, 2, 3, 4])
CPU times: user 2.24 ms, sys: 1.62 ms, total: 3.85 ms
Wall time: 146 ms
[3]:
10
定义函数的输入参数集¶
我们将生成一组输入,希望在其上运行我们的模拟函数。这里我们使用 Pandas dataframe,但也可以使用简单的列表。假设我们的模拟使用四个名为 param_[a-d] 的参数运行。
[4]:
import pandas as pd
import numpy as np
input_params = pd.DataFrame(np.random.random(size=(500, 4)),
columns=['param_a', 'param_b', 'param_c', 'param_d'])
input_params.head()
[4]:
param_a | param_b | param_c | param_d | |
---|---|---|---|---|
0 | 0.312124 | 0.071036 | 0.799919 | 0.766664 |
1 | 0.956577 | 0.471111 | 0.554442 | 0.263298 |
2 | 0.583202 | 0.789435 | 0.135610 | 0.622723 |
3 | 0.990032 | 0.253090 | 0.225035 | 0.190979 |
4 | 0.130992 | 0.303275 | 0.514555 | 0.420719 |
不使用 Dask,我们可以使用普通的 Python for 循环在所有这些参数上调用我们的模拟。
我们只在我们参数的样本上进行此操作,否则会相当耗时。
[5]:
results = []
[6]:
%%time
for parameters in input_params.values[:10]:
result = costly_simulation(parameters)
results.append(result)
CPU times: user 123 ms, sys: 15.1 ms, total: 138 ms
Wall time: 4.25 s
[7]:
results
[7]:
[1.9497434903818136,
2.245428056748925,
2.1309710532899184,
1.65913554017504,
1.3695413820396394,
1.5063310042417761,
1.2444777543824386,
1.531794781158632,
1.3147033688719252,
2.925532773375028]
请注意,这不是很明智,因为我们可以轻松地并行化代码。
在 Python 中有许多方法可以使用像 multiprocessing
、concurrent.futures
、joblib
等库来并行化此函数。这些都是不错的起点。Dask 是一个很好的下一步,特别是当您想跨多台机器进行扩展时。
使用 Dask Delayed 使函数延迟执行¶
我们可以在函数上调用 dask.delayed
使其变为延迟执行。它不会立即计算结果,而是将我们想计算的内容记录为一个任务到图中,稍后我们将在并行硬件上运行该图。使用 dask.delayed
是一种相对直接的方法来并行化现有代码库,即使计算不像本例这样易于并行化。
现在调用这些延迟函数几乎是免费的。在下面的单元格中,我们只构建了一个简单的图。
[8]:
import dask
lazy_results = []
[9]:
%%time
for parameters in input_params.values[:10]:
lazy_result = dask.delayed(costly_simulation)(parameters)
lazy_results.append(lazy_result)
CPU times: user 1.43 ms, sys: 0 ns, total: 1.43 ms
Wall time: 1.08 ms
[10]:
lazy_results[0]
[10]:
Delayed('costly_simulation-ae2bb390-5058-4db9-a033-6498fb822270')
并行运行¶
lazy_results
列表包含十次尚未运行的 costly_simulation
调用信息。当您希望结果作为普通 Python 对象时,调用 .compute()
。
如果您在上面启动了 Client()
,那么您可能希望在计算期间查看状态页面。
[11]:
%time dask.compute(*lazy_results)
CPU times: user 261 ms, sys: 43.6 ms, total: 304 ms
Wall time: 1.44 s
[11]:
(1.9497434903818136,
2.245428056748925,
2.1309710532899184,
1.65913554017504,
1.3695413820396394,
1.5063310042417761,
1.2444777543824386,
1.531794781158632,
1.3147033688719252,
2.925532773375028)
请注意,这比使用 for 循环按顺序运行相同的计算要快。
我们现在可以在所有输入参数上运行此操作
[12]:
import dask
lazy_results = []
for parameters in input_params.values:
lazy_result = dask.delayed(costly_simulation)(parameters)
lazy_results.append(lazy_result)
futures = dask.persist(*lazy_results) # trigger computation in the background
为了让速度更快,我们可以添加更多的 worker。
(尽管我们仍然只在本地机器上工作,但在使用实际集群时这更实用)
[13]:
client.cluster.scale(10) # ask for ten 4-thread workers
通过查看 Dask 仪表盘,我们可以看到 Dask 将这些工作分散到我们的集群中,管理负载均衡、依赖关系等。
然后获取结果
[14]:
results = dask.compute(*futures)
results[:5]
[14]:
(1.9497434903818136,
2.245428056748925,
2.1309710532899184,
1.65913554017504,
1.3695413820396394)
使用 Futures API¶
使用 Dask 的 Futures API 也可以实现相同的示例,通过使用 client
对象本身即可。对于将一个函数应用于许多输入的用例,Dask delayed 和 Dask Futures 都同样有用。Futures API 有点不同,因为它会立即开始工作,而不是完全延迟执行。
例如,注意在下面的单元格中,当我们向集群提交工作时,工作会立即开始
[15]:
futures = []
for parameters in input_params.values:
future = client.submit(costly_simulation, parameters)
futures.append(future)
我们可以通过调用 client.gather
显式等待工作完成并将结果收集到本地进程
[16]:
results = client.gather(futures)
results[:5]
[16]:
[1.9497434903818136,
2.245428056748925,
2.1309710532899184,
1.65913554017504,
1.3695413820396394]
但上面的代码可以使用 client.map()
函数以更少的行数运行,该函数允许在参数列表上调用给定函数。
与 delayed 类似,我们可以只启动计算而不立即调用 client.gather()
来等待结果。
需要注意的是,由于 Dask 集群已经使用 Futures API 在给定的输入参数上执行了启动 costly_simulation
的任务,调用 client.map()
实际上不会触发任何新的计算,而只会检索已经计算出的结果。
[17]:
futures = client.map(costly_simulation, input_params.values)
然后稍后获取结果
[18]:
results = client.gather(futures)
len(results)
[18]:
500
[19]:
print(results[0])
1.9497434903818136
我们建议您查看 仪表盘的状态页面 以查看正在进行的计算。
对结果进行一些分析¶
Dask 在此的优势之一,除了 API 的简洁性之外,还在于您能够通过一次调用收集所有模拟的结果。无需实现复杂的机制或将单个结果写入共享文件系统或对象存储。
只需获取您的结果,然后进行一些计算。
这里,我们将只获取结果并扩展我们的初始 dataframe,以便清晰地查看计算的参数与结果。
[20]:
output = input_params.copy()
output['result'] = pd.Series(results, index=output.index)
output.sample(5)
[20]:
param_a | param_b | param_c | param_d | result | |
---|---|---|---|---|---|
94 | 0.398214 | 0.472349 | 0.366929 | 0.822720 | 2.060212 |
114 | 0.808715 | 0.863210 | 0.818088 | 0.396112 | 2.886125 |
342 | 0.072536 | 0.814248 | 0.309066 | 0.631732 | 1.827582 |
427 | 0.822234 | 0.947608 | 0.423060 | 0.200821 | 2.393723 |
322 | 0.358192 | 0.228315 | 0.864903 | 0.195684 | 1.647095 |
然后我们可以在此处使用 pandas 接口绘制一些美观的统计图或将结果保存在本地
[21]:
%matplotlib inline
output['result'].plot()
[21]:
<AxesSubplot:>

[22]:
output['result'].mean()
[22]:
1.9844499213466507
[23]:
filtered_output = output[output['result'] > 2]
print(len(filtered_output))
filtered_output.to_csv('/tmp/simulation_result.csv')
244
使用 Bag 处理大型模拟¶
上述方法对于输入参数数量最多约 100,000 的情况效果很好。超过这个数量,Dask 调度器难以处理需要调度到 worker 的任务量。解决此问题的方法是将许多参数捆绑到一个任务中。您可以通过创建一个新函数来处理一批参数并对该函数使用 delayed 或 futures API 来实现此目的。您也可以使用 Dask Bag API。有关此内容的更多描述,请参见文档中关于避免过多任务的部分。
Dask Bag 以少量分区存储大型序列。我们可以将 input_params
序列转换为 dask.bag
集合,请求较少的分区(因此最多 100,000 个,这已经很大),并将我们的函数应用于 bag 中的每个项。
[24]:
import dask.bag as db
b = db.from_sequence(list(input_params.values), npartitions=100)
b = b.map(costly_simulation)
[25]:
%time results_bag = b.compute()
CPU times: user 926 ms, sys: 121 ms, total: 1.05 s
Wall time: 7.79 s
在此查看仪表盘,您应该会看到只有 100 个任务需要运行,而不是 500 个,每个任务平均耗时增加 5 倍,因为每个任务实际上调用了我们的函数 5 次。
[26]:
np.all(results) == np.all(results_bag)
[26]:
True