实时 Notebook

您可以在实时会话中运行此 Notebook Binder徽章 或在 Github 上查看

Dask logo

易并行工作负载

本 Notebook 展示了如何使用 Dask 并行化易并行工作负载,即您希望将一个函数独立应用于许多数据片段的情况。它将展示使用 Dask 实现此目的的三种不同方法:

  1. dask.delayed

  2. concurrent.Futures

  3. dask.bag

本示例侧重于使用 Dask 构建大型易并行计算,这常见于科学界和高性能计算设施中,例如蒙特卡洛方法。这类模拟假设以下几点:

  • 我们有一个给定一些参数即可执行繁重计算的函数。

  • 我们需要在许多不同的输入参数上计算此函数,且每个函数调用都是独立的。

  • 我们希望将所有结果收集到一处以便进一步分析。

启动 Dask Client 查看仪表盘

启动 Dask Client 将提供一个仪表盘,有助于深入了解计算过程。本示例使用 Futures API 的部分也将需要它。此外,由于这类计算通常在超级计算机或云环境中启动,您最终可能需要启动一个集群并连接一个客户端以实现扩展。请参阅 dask-jobqueuedask-kubernetesdask-yarn,它们分别提供了在 HPC、云或大数据基础设施上轻松实现此目的的方法。

仪表盘的链接将在您创建下面的客户端时显示。我们建议您在一侧屏幕打开仪表盘,同时在另一侧使用 Notebook。调整窗口位置可能需要一些努力,但在学习时同时查看两者非常有用。

[1]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client
[1]:

Client

Client-6c01af35-0de0-11ed-9e28-000d3a8f7959

连接方法: 集群对象 集群类型: distributed.LocalCluster
仪表盘: http://127.0.0.1:8787/status

集群信息

定义计算调用函数

此函数执行一个简单的操作:将列表/数组中的所有数字相加,但它也会随机休眠一段时间以模拟实际工作。在实际用例中,这可以调用另一个 python 模块,甚至可以使用 subprocess 模块运行可执行文件。

[2]:
import time
import random

def costly_simulation(list_param):
    time.sleep(random.random())
    return sum(list_param)

我们在下方本地尝试一下

[3]:
%time costly_simulation([1, 2, 3, 4])
CPU times: user 2.24 ms, sys: 1.62 ms, total: 3.85 ms
Wall time: 146 ms
[3]:
10

定义函数的输入参数集

我们将生成一组输入,希望在其上运行我们的模拟函数。这里我们使用 Pandas dataframe,但也可以使用简单的列表。假设我们的模拟使用四个名为 param_[a-d] 的参数运行。

[4]:
import pandas as pd
import numpy as np

input_params = pd.DataFrame(np.random.random(size=(500, 4)),
                            columns=['param_a', 'param_b', 'param_c', 'param_d'])
input_params.head()
[4]:
param_a param_b param_c param_d
0 0.312124 0.071036 0.799919 0.766664
1 0.956577 0.471111 0.554442 0.263298
2 0.583202 0.789435 0.135610 0.622723
3 0.990032 0.253090 0.225035 0.190979
4 0.130992 0.303275 0.514555 0.420719

不使用 Dask,我们可以使用普通的 Python for 循环在所有这些参数上调用我们的模拟。

我们只在我们参数的样本上进行此操作,否则会相当耗时。

[5]:
results = []
[6]:
%%time
for parameters in input_params.values[:10]:
    result = costly_simulation(parameters)
    results.append(result)
CPU times: user 123 ms, sys: 15.1 ms, total: 138 ms
Wall time: 4.25 s
[7]:
results
[7]:
[1.9497434903818136,
 2.245428056748925,
 2.1309710532899184,
 1.65913554017504,
 1.3695413820396394,
 1.5063310042417761,
 1.2444777543824386,
 1.531794781158632,
 1.3147033688719252,
 2.925532773375028]

请注意,这不是很明智,因为我们可以轻松地并行化代码。

在 Python 中有许多方法可以使用像 multiprocessingconcurrent.futuresjoblib 等库来并行化此函数。这些都是不错的起点。Dask 是一个很好的下一步,特别是当您想跨多台机器进行扩展时。

使用 Dask Delayed 使函数延迟执行

我们可以在函数上调用 dask.delayed 使其变为延迟执行。它不会立即计算结果,而是将我们想计算的内容记录为一个任务到图中,稍后我们将在并行硬件上运行该图。使用 dask.delayed 是一种相对直接的方法来并行化现有代码库,即使计算不像本例这样易于并行化。

现在调用这些延迟函数几乎是免费的。在下面的单元格中,我们只构建了一个简单的图。

[8]:
import dask
lazy_results = []
[9]:
%%time

for parameters in input_params.values[:10]:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)
CPU times: user 1.43 ms, sys: 0 ns, total: 1.43 ms
Wall time: 1.08 ms
[10]:
lazy_results[0]
[10]:
Delayed('costly_simulation-ae2bb390-5058-4db9-a033-6498fb822270')

并行运行

lazy_results 列表包含十次尚未运行的 costly_simulation 调用信息。当您希望结果作为普通 Python 对象时,调用 .compute()

如果您在上面启动了 Client(),那么您可能希望在计算期间查看状态页面。

[11]:
%time dask.compute(*lazy_results)
CPU times: user 261 ms, sys: 43.6 ms, total: 304 ms
Wall time: 1.44 s
[11]:
(1.9497434903818136,
 2.245428056748925,
 2.1309710532899184,
 1.65913554017504,
 1.3695413820396394,
 1.5063310042417761,
 1.2444777543824386,
 1.531794781158632,
 1.3147033688719252,
 2.925532773375028)

请注意,这比使用 for 循环按顺序运行相同的计算要快。

我们现在可以在所有输入参数上运行此操作

[12]:
import dask
lazy_results = []

for parameters in input_params.values:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)

futures = dask.persist(*lazy_results)  # trigger computation in the background

为了让速度更快,我们可以添加更多的 worker。

(尽管我们仍然只在本地机器上工作,但在使用实际集群时这更实用)

[13]:
client.cluster.scale(10)  # ask for ten 4-thread workers

通过查看 Dask 仪表盘,我们可以看到 Dask 将这些工作分散到我们的集群中,管理负载均衡、依赖关系等。

然后获取结果

[14]:
results = dask.compute(*futures)
results[:5]
[14]:
(1.9497434903818136,
 2.245428056748925,
 2.1309710532899184,
 1.65913554017504,
 1.3695413820396394)

使用 Futures API

使用 Dask 的 Futures API 也可以实现相同的示例,通过使用 client 对象本身即可。对于将一个函数应用于许多输入的用例,Dask delayed 和 Dask Futures 都同样有用。Futures API 有点不同,因为它会立即开始工作,而不是完全延迟执行。

例如,注意在下面的单元格中,当我们向集群提交工作时,工作会立即开始

[15]:
futures = []
for parameters in input_params.values:
    future = client.submit(costly_simulation, parameters)
    futures.append(future)

我们可以通过调用 client.gather 显式等待工作完成并将结果收集到本地进程

[16]:
results = client.gather(futures)
results[:5]
[16]:
[1.9497434903818136,
 2.245428056748925,
 2.1309710532899184,
 1.65913554017504,
 1.3695413820396394]

但上面的代码可以使用 client.map() 函数以更少的行数运行,该函数允许在参数列表上调用给定函数。

与 delayed 类似,我们可以只启动计算而不立即调用 client.gather() 来等待结果。

需要注意的是,由于 Dask 集群已经使用 Futures API 在给定的输入参数上执行了启动 costly_simulation 的任务,调用 client.map() 实际上不会触发任何新的计算,而只会检索已经计算出的结果。

[17]:
futures = client.map(costly_simulation, input_params.values)

然后稍后获取结果

[18]:
results = client.gather(futures)
len(results)
[18]:
500
[19]:
print(results[0])
1.9497434903818136

我们建议您查看 仪表盘的状态页面 以查看正在进行的计算。

对结果进行一些分析

Dask 在此的优势之一,除了 API 的简洁性之外,还在于您能够通过一次调用收集所有模拟的结果。无需实现复杂的机制或将单个结果写入共享文件系统或对象存储。

只需获取您的结果,然后进行一些计算。

这里,我们将只获取结果并扩展我们的初始 dataframe,以便清晰地查看计算的参数与结果。

[20]:
output = input_params.copy()
output['result'] = pd.Series(results, index=output.index)
output.sample(5)
[20]:
param_a param_b param_c param_d result
94 0.398214 0.472349 0.366929 0.822720 2.060212
114 0.808715 0.863210 0.818088 0.396112 2.886125
342 0.072536 0.814248 0.309066 0.631732 1.827582
427 0.822234 0.947608 0.423060 0.200821 2.393723
322 0.358192 0.228315 0.864903 0.195684 1.647095

然后我们可以在此处使用 pandas 接口绘制一些美观的统计图或将结果保存在本地

[21]:
%matplotlib inline
output['result'].plot()
[21]:
<AxesSubplot:>
../_images/applications_embarrassingly-parallel_41_1.png
[22]:
output['result'].mean()
[22]:
1.9844499213466507
[23]:
filtered_output = output[output['result'] > 2]
print(len(filtered_output))
filtered_output.to_csv('/tmp/simulation_result.csv')
244

使用 Bag 处理大型模拟

上述方法对于输入参数数量最多约 100,000 的情况效果很好。超过这个数量,Dask 调度器难以处理需要调度到 worker 的任务量。解决此问题的方法是将许多参数捆绑到一个任务中。您可以通过创建一个新函数来处理一批参数并对该函数使用 delayed 或 futures API 来实现此目的。您也可以使用 Dask Bag API。有关此内容的更多描述,请参见文档中关于避免过多任务的部分。

Dask Bag 以少量分区存储大型序列。我们可以将 input_params 序列转换为 dask.bag 集合,请求较少的分区(因此最多 100,000 个,这已经很大),并将我们的函数应用于 bag 中的每个项。

[24]:
import dask.bag as db
b = db.from_sequence(list(input_params.values), npartitions=100)
b = b.map(costly_simulation)
[25]:
%time results_bag = b.compute()
CPU times: user 926 ms, sys: 121 ms, total: 1.05 s
Wall time: 7.79 s

在此查看仪表盘,您应该会看到只有 100 个任务需要运行,而不是 500 个,每个任务平均耗时增加 5 倍,因为每个任务实际上调用了我们的函数 5 次。

[26]:
np.all(results) == np.all(results_bag)
[26]:
True