使用 Futures 的自定义工作负载
目录
实时 Notebook
您可以 在实时会话中 运行此 notebook ,或 在 Github 上 查看它。
使用 Futures 的自定义工作负载¶
Dask futures 为自定义场景提供细粒度的实时执行。这是 Dask 数组和 DataFrames 等其他 API 的基础。
启动 Dask Client¶
与数组和 DataFrames 不同,使用 Futures 接口需要 Dask client。此外,client 提供了一个仪表板,有助于了解计算情况。
当您在下面创建 client 时,仪表板的链接将可见。我们建议您在一侧屏幕上打开仪表板,同时在另一侧使用您的 notebook。这可能需要一些工夫来排列窗口,但在学习时同时看到两者非常有用。
[1]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client
[1]:
Client
Client-fee34b1a-0ddf-11ed-9a76-000d3a8f7959
连接方法: 集群对象 | 集群类型: distributed.LocalCluster |
仪表板: http://127.0.0.1:8787/status |
集群信息
LocalCluster
cb0544e0
仪表板: http://127.0.0.1:8787/status | Workers 1 |
总线程数 4 | 总内存: 6.78 GiB |
状态: 运行中 | 使用进程: True |
调度器信息
调度器
Scheduler-8e5a5d68-572d-407a-addc-5d81df8d4c5f
通信: tcp://127.0.0.1:38745 | Workers 1 |
仪表板: http://127.0.0.1:8787/status | 总线程数 4 |
启动时间: 刚刚 | 总内存: 6.78 GiB |
Workers
Worker: 0
通信: tcp://127.0.0.1:37435 | 总线程数 4 |
仪表板: http://127.0.0.1:36253/status | 内存: 6.78 GiB |
Nanny: tcp://127.0.0.1:33885 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-5e0um0el |
创建简单函数¶
这些函数执行简单的操作,例如将两个数字相加,但它们会随机休眠一段时间来模拟实际工作。
[2]:
import time
import random
def inc(x):
time.sleep(random.random())
return x + 1
def double(x):
time.sleep(random.random())
return 2 * x
def add(x, y):
time.sleep(random.random())
return x + y
我们可以在本地运行它们
[3]:
inc(1)
[3]:
2
或者我们可以将它们提交给 Dask 远程运行。这会立即返回一个指向正在进行的计算的 future,并最终指向存储的结果。
[4]:
future = client.submit(inc, 1) # returns immediately with pending future
future
[4]:
如果您等待一秒钟,然后再次检查 future,您会看到它已经完成。
[5]:
future # scheduler and client talk constantly
[5]:
您可以使用 .result()
方法阻塞计算并收集结果。
[6]:
future.result()
[6]:
2
链式依赖¶
您可以在其他 futures 上提交任务。这将在输入和输出之间创建依赖关系。Dask 将跟踪所有任务的执行,确保下游任务在适当的时间、地点和使用适当的数据运行。
[7]:
x = client.submit(inc, 1)
y = client.submit(double, 2)
z = client.submit(add, x, y)
z
[7]:
[8]:
z.result()
[8]:
6
请注意,我们从未阻塞 x
或 y
,也无需将它们的数据移回我们的 notebook。
提交许多任务¶
至此,我们已经学会了如何在远程运行 Python 函数。当我们添加两样东西时,这将变得非常有用:
我们可以每秒提交数千个任务
任务可以通过消耗 futures 作为输入来相互依赖
我们在一个普通的 Python for 循环中提交许多相互依赖的任务
[9]:
zs = []
[10]:
%%time
for i in range(256):
x = client.submit(inc, i) # x = inc(i)
y = client.submit(double, x) # y = inc(x)
z = client.submit(add, x, y) # z = inc(y)
zs.append(z)
CPU times: user 2.58 s, sys: 92.2 ms, total: 2.68 s
Wall time: 2.57 s
[11]:
total = client.submit(sum, zs)
为了让它运行得更快,可以添加更多核心的额外 workers
(尽管我们仍然只在本地机器上工作,但在使用实际集群时,这会更实用)
[12]:
client.cluster.scale(10) # ask for ten 4-thread workers
自定义计算:树状求和¶
作为一个非平凡算法的示例,考虑经典的树状归约。我们通过嵌套的 for 循环和一些普通的 Python 逻辑来实现它。
finish total single output
^ / \
| c1 c2 neighbors merge
| / \ / \
| b1 b2 b3 b4 neighbors merge
^ / \ / \ / \ / \
start a1 a2 a3 a4 a5 a6 a7 a8 many inputs
[13]:
L = zs
while len(L) > 1:
new_L = []
for i in range(0, len(L), 2):
future = client.submit(add, L[i], L[i + 1]) # add neighbors
new_L.append(future)
L = new_L # swap old list for new
如果您正在查看 仪表板的状态页面,那么您可能想注意两点:
红色条表示 worker 间的通信。它们发生在不同 worker 需要组合其中间值时
开始时有很多并行性,但越到后面越少,因为我们到达树的顶部,那里需要做的工作较少。
或者,您可能想导航到 仪表板的图页面,然后再次运行上面的单元格。您将能够看到任务图在计算过程中演变。
动态构建计算¶
在上面的示例中,我们提前显式指定了任务图。例如,我们知道列表 L
中的前两个 futures 将相加。
然而,有时这并非总是最好的方法,有时您希望在计算进行时动态定义计算。例如,我们可能希望根据哪个 future 先出现来对这些值求和,而不是根据它们最初在列表中放置的顺序。
为此,我们可以使用诸如 as_completed 的操作。
我们建议在运行此计算时查看仪表板的图页面。您应该会看到图在执行过程中自行构建。
[14]:
del future, L, new_L, total # clear out some old work
[15]:
from dask.distributed import as_completed
zs = client.map(inc, zs)
seq = as_completed(zs)
while seq.count() > 1: # at least two futures left
a = next(seq)
b = next(seq)
new = client.submit(add, a, b, priority=1) # add them together
seq.add(new) # add new future back into loop