在线 Notebook

您可以在 在线会话 Binder 或在 Github 上查看

抵御硬件故障

场景:我们有一个集群,其中部分资源是可抢占的。也就是说,在计算过程中,我们必须处理工作节点突然被关闭的情况。虽然这里使用 LocalCluster 进行了演示,但 Dask 针对抢占式资源的弹性在诸如 Dask KubernetesDask Jobqueue 等场景中最有用。

相关文档:https://distributed.dask.org.cn/en/latest/resilience.html#hardware-failures

提高弹性

每当一个工作节点关闭时,调度器会增加分配给(不一定正在计算)该工作节点的所有任务的可疑计数器。每当任务的可疑度超过某个阈值(默认为 3)时,该任务将被视为中断。我们想在少数工作节点上计算许多任务,同时工作节点随机关闭。因此我们预计所有任务的可疑度会快速增长。让我们增加阈值

[1]:
import dask

dask.config.set({'distributed.scheduler.allowed-failures': 100});

所有其他导入

[2]:
from dask.distributed import Client, LocalCluster
from dask import bag as db
import os
import random
from time import sleep

集群

[3]:
cluster = LocalCluster(threads_per_worker=1, n_workers=4, memory_limit=400e6)
client = Client(cluster)
client
[3]:

客户端

Client-24950cb3-0de0-11ed-9bcc-000d3a8f7959

连接方法: 集群对象 集群类型: distributed.LocalCluster
仪表板: http://127.0.0.1:8787/status

集群信息

一个简单的工作负载

我们将一系列数字乘以二,添加一些休眠来模拟实际工作,然后通过求和来归约整个加倍后的数字序列。

[4]:
def multiply_by_two(x):
    sleep(0.02)
    return 2 * x
[5]:
N = 400

x = db.from_sequence(range(N), npartitions=N // 2)

mults = x.map(multiply_by_two)

summed = mults.sum()

突然关闭工作节点

让我们将两个工作进程 ID 标记为不可抢占。

[6]:
all_current_workers = [w.pid for w in cluster.scheduler.workers.values()]
non_preemptible_workers = all_current_workers[:2]
[7]:
def kill_a_worker():
    preemptible_workers = [
        w.pid for w in cluster.scheduler.workers.values()
        if w.pid not in non_preemptible_workers]
    if preemptible_workers:
        os.kill(random.choice(preemptible_workers), 15)

启动计算并在运行时持续关闭工作节点

[8]:
summed = client.compute(summed)

while not summed.done():
    kill_a_worker()
    sleep(3.0)
2022-07-27 19:13:06,023 - distributed.nanny - WARNING - Restarting worker

检查结果是否匹配

[9]:
print(f"`sum(range({N}))` on cluster: {summed.result()}\t(should be {N * (N-1)})")
`sum(range(400))` on cluster: 159600    (should be 159600)