实时 Notebook

您可以 在实时会话中 运行此 notebook Binder,或 在 Github 上 查看它。

使用 Futures 的自定义工作负载

Dask logo

Dask futures 为自定义场景提供细粒度的实时执行。这是 Dask 数组和 DataFrames 等其他 API 的基础。

启动 Dask Client

与数组和 DataFrames 不同,使用 Futures 接口需要 Dask client。此外,client 提供了一个仪表板,有助于了解计算情况。

当您在下面创建 client 时,仪表板的链接将可见。我们建议您在一侧屏幕上打开仪表板,同时在另一侧使用您的 notebook。这可能需要一些工夫来排列窗口,但在学习时同时看到两者非常有用。

[1]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client
[1]:

Client

Client-fee34b1a-0ddf-11ed-9a76-000d3a8f7959

连接方法: 集群对象 集群类型: distributed.LocalCluster
仪表板: http://127.0.0.1:8787/status

集群信息

创建简单函数

这些函数执行简单的操作,例如将两个数字相加,但它们会随机休眠一段时间来模拟实际工作。

[2]:
import time
import random

def inc(x):
    time.sleep(random.random())
    return x + 1

def double(x):
    time.sleep(random.random())
    return 2 * x

def add(x, y):
    time.sleep(random.random())
    return x + y

我们可以在本地运行它们

[3]:
inc(1)
[3]:
2

或者我们可以将它们提交给 Dask 远程运行。这会立即返回一个指向正在进行的计算的 future,并最终指向存储的结果。

[4]:
future = client.submit(inc, 1)  # returns immediately with pending future
future
[4]:
Future: inc 状态: 待处理, 类型: NoneType, 键: inc-6a39f735d591f959f1c67a637717a113

如果您等待一秒钟,然后再次检查 future,您会看到它已经完成。

[5]:
future  # scheduler and client talk constantly
[5]:
Future: inc 状态: 待处理, 类型: NoneType, 键: inc-6a39f735d591f959f1c67a637717a113

您可以使用 .result() 方法阻塞计算并收集结果。

[6]:
future.result()
[6]:
2

链式依赖

您可以在其他 futures 上提交任务。这将在输入和输出之间创建依赖关系。Dask 将跟踪所有任务的执行,确保下游任务在适当的时间、地点和使用适当的数据运行。

[7]:
x = client.submit(inc, 1)
y = client.submit(double, 2)
z = client.submit(add, x, y)
z
[7]:
Future: add 状态: 待处理, 类型: NoneType, 键: add-a255b15196528b8a69e0ea73bff2be61
[8]:
z.result()
[8]:
6

请注意,我们从未阻塞 xy,也无需将它们的数据移回我们的 notebook。

提交许多任务

至此,我们已经学会了如何在远程运行 Python 函数。当我们添加两样东西时,这将变得非常有用:

  1. 我们可以每秒提交数千个任务

  2. 任务可以通过消耗 futures 作为输入来相互依赖

我们在一个普通的 Python for 循环中提交许多相互依赖的任务

[9]:
zs = []
[10]:
%%time

for i in range(256):
    x = client.submit(inc, i)     # x = inc(i)
    y = client.submit(double, x)  # y = inc(x)
    z = client.submit(add, x, y)  # z = inc(y)
    zs.append(z)
CPU times: user 2.58 s, sys: 92.2 ms, total: 2.68 s
Wall time: 2.57 s
[11]:
total = client.submit(sum, zs)

为了让它运行得更快,可以添加更多核心的额外 workers

(尽管我们仍然只在本地机器上工作,但在使用实际集群时,这会更实用)

[12]:
client.cluster.scale(10)  # ask for ten 4-thread workers

自定义计算:树状求和

作为一个非平凡算法的示例,考虑经典的树状归约。我们通过嵌套的 for 循环和一些普通的 Python 逻辑来实现它。

finish           total             single output
    ^          /        \
    |        c1          c2        neighbors merge
    |       /  \        /  \
    |     b1    b2    b3    b4     neighbors merge
    ^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs
[13]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        future = client.submit(add, L[i], L[i + 1])  # add neighbors
        new_L.append(future)
    L = new_L                                   # swap old list for new

如果您正在查看 仪表板的状态页面,那么您可能想注意两点:

  1. 红色条表示 worker 间的通信。它们发生在不同 worker 需要组合其中间值时

  2. 开始时有很多并行性,但越到后面越少,因为我们到达树的顶部,那里需要做的工作较少。

或者,您可能想导航到 仪表板的图页面,然后再次运行上面的单元格。您将能够看到任务图在计算过程中演变。

动态构建计算

在上面的示例中,我们提前显式指定了任务图。例如,我们知道列表 L 中的前两个 futures 将相加。

然而,有时这并非总是最好的方法,有时您希望在计算进行时动态定义计算。例如,我们可能希望根据哪个 future 先出现来对这些值求和,而不是根据它们最初在列表中放置的顺序。

为此,我们可以使用诸如 as_completed 的操作。

我们建议在运行此计算时查看仪表板的图页面。您应该会看到图在执行过程中自行构建。

[14]:
del future, L, new_L, total  # clear out some old work
[15]:
from dask.distributed import as_completed

zs = client.map(inc, zs)
seq = as_completed(zs)

while seq.count() > 1:  # at least two futures left
    a = next(seq)
    b = next(seq)
    new = client.submit(add, a, b, priority=1)  # add them together
    seq.add(new)                                # add new future back into loop