在线 Notebook

您可以在在线会话中运行此 Notebook Binder,或在 Github 上查看它。

使用 Dask Delayed 处理自定义工作负载

Dask logo

因为并非所有问题都适用于数据帧

本 Notebook 展示了如何使用 dask.delayed 来并行化通用 Python 代码。

Dask.delayed 是一种简单而强大的并行化现有代码的方式。它允许用户将函数调用延迟到一个具有依赖关系的任务图中。Dask.delayed 不提供像 Dask.dataframe 那样花哨的并行算法,但它确实让用户完全控制他们想要构建的内容。

像 Dask.dataframe 这样的系统就是用 Dask.delayed 构建的。如果你的问题可以并行化,但不像一个大数组或一个大数据帧那么简单,那么 dask.delayed 可能是你的正确选择。

启动 Dask 客户端以查看仪表板

启动 Dask 客户端是可选的。它将提供一个仪表板,这对于了解计算过程很有用。

当您在下方创建客户端时,仪表板的链接将可见。我们建议在您使用 Notebook 的同时,将仪表板打开在屏幕的一侧。这可能需要一些精力来安排您的窗口,但在学习时同时看到它们两者非常有用。

[1]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client
[1]:

客户端

Client-ec34dddc-0ddf-11ed-9937-000d3a8f7959

连接方法: Cluster 对象 集群类型: distributed.LocalCluster
仪表板: http://127.0.0.1:8787/status

集群信息

创建简单函数

这些函数执行简单的操作,例如将两个数字相加,但它们会随机休眠一段时间以模拟实际工作。

[2]:
import time
import random

def inc(x):
    time.sleep(random.random())
    return x + 1

def dec(x):
    time.sleep(random.random())
    return x - 1

def add(x, y):
    time.sleep(random.random())
    return x + y

我们可以像下面的普通 Python 函数一样运行它们

[3]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z
CPU times: user 63.7 ms, sys: 9.27 ms, total: 73 ms
Wall time: 1.96 s
[3]:
3

它们一个接一个地按顺序运行。但请注意,前两行 inc(1)dec(2) 彼此不依赖,如果足够聪明,我们本来可以并行调用它们。

使用 Dask Delayed 注解函数使其惰性化

我们可以对函数调用 dask.delayed 使其惰性化。它们不会立即计算结果,而是将我们要计算的内容记录为一个任务,放入稍后将在并行硬件上运行的任务图中。

[4]:
import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

现在调用这些惰性函数几乎是免费的。我们只是在构建一个图。

[5]:
x = inc(1)
y = dec(2)
z = add(x, y)
z
[5]:
Delayed('add-06d4e85b-8b4e-4b7c-89d1-410e5876095b')

可视化计算

你需要安装 graphviz 才能使其工作

[6]:
z.visualize(rankdir='LR')
[6]:
_images/delayed_13_0.png

并行运行

当你想要将结果作为普通的 Python 对象时,调用 .compute()

如果你在上面启动了 Client(),那么你可能想在计算过程中查看状态页面。

[7]:
z.compute()
[7]:
3

并行化普通 Python 代码

现在我们在普通的 for 循环 Python 代码中使用 Dask。这会生成任务图而不是直接执行计算,但看起来仍然像我们之前的代码。Dask 是一种方便的方式,可以为现有工作流添加并行性。

[8]:
zs = []
[9]:
%%time
for i in range(256):
    x = inc(i)
    y = dec(x)
    z = add(x, y)
    zs.append(z)
CPU times: user 117 ms, sys: 4.68 ms, total: 121 ms
Wall time: 115 ms
[10]:
zs = dask.persist(*zs)  # trigger computation in the background

为了加快速度,添加更多工作节点。

(尽管我们仍然只在本地机器上工作,但在使用实际集群时这更实用)

[11]:
client.cluster.scale(10)  # ask for ten 4-thread workers

通过查看 Dask 仪表板,我们可以看到 Dask 将这项工作分散到我们的集群中,管理负载均衡、依赖关系等。

自定义计算:树状求和

作为一个非平凡算法的例子,考虑经典的树状规约。我们通过嵌套的 for 循环和一些普通的 Python 逻辑来实现这一点。

finish           total             single output
    ^          /        \
    |        c1          c2        neighbors merge
    |       /  \        /  \
    |     b1    b2    b3    b4     neighbors merge
    ^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs
[12]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        lazy = add(L[i], L[i + 1])  # add neighbors
        new_L.append(lazy)
    L = new_L                       # swap old list for new

dask.compute(L)
[12]:
([65536],)

如果您正在查看仪表板的状态页面,那么您可能需要注意两件事

  1. 红色条表示工作节点间通信。它们发生在不同工作节点需要合并其中间值时

  2. 开始时有很多并行性,但随着我们到达树的顶部,工作量减少,并行性也随之减少。

或者,您可能想导航到仪表板的图页面,然后再次运行上面的单元格。您将能够在计算过程中看到任务图的演变。

延伸阅读

要深入了解 Dask 中的 delayed 和惰性操作,请参阅dask 教程,notebooks 01 和 01x。