抵御硬件故障
目录
在线 Notebook
您可以在 在线会话 或在 Github 上查看。
抵御硬件故障¶
场景:我们有一个集群,其中部分资源是可抢占的。也就是说,在计算过程中,我们必须处理工作节点突然被关闭的情况。虽然这里使用 LocalCluster
进行了演示,但 Dask 针对抢占式资源的弹性在诸如 Dask Kubernetes 或 Dask Jobqueue 等场景中最有用。
相关文档:https://distributed.dask.org.cn/en/latest/resilience.html#hardware-failures
提高弹性¶
每当一个工作节点关闭时,调度器会增加分配给(不一定正在计算)该工作节点的所有任务的可疑计数器。每当任务的可疑度超过某个阈值(默认为 3)时,该任务将被视为中断。我们想在少数工作节点上计算许多任务,同时工作节点随机关闭。因此我们预计所有任务的可疑度会快速增长。让我们增加阈值
[1]:
import dask
dask.config.set({'distributed.scheduler.allowed-failures': 100});
所有其他导入¶
[2]:
from dask.distributed import Client, LocalCluster
from dask import bag as db
import os
import random
from time import sleep
集群¶
[3]:
cluster = LocalCluster(threads_per_worker=1, n_workers=4, memory_limit=400e6)
client = Client(cluster)
client
[3]:
客户端
Client-24950cb3-0de0-11ed-9bcc-000d3a8f7959
连接方法: 集群对象 | 集群类型: distributed.LocalCluster |
仪表板: http://127.0.0.1:8787/status |
集群信息
LocalCluster
22f50347
仪表板: http://127.0.0.1:8787/status | 工作节点 4 |
总线程数 4 | 总内存: 1.49 GiB |
状态: 运行中 | 使用进程: 是 |
调度器信息
调度器
Scheduler-0999d4fd-0cdb-4600-8ce3-c52d841558f8
通信地址: tcp://127.0.0.1:35393 | 工作节点 4 |
仪表板: http://127.0.0.1:8787/status | 总线程数 4 |
启动时间: 刚刚 | 总内存: 1.49 GiB |
工作节点
工作节点:0
通信地址: tcp://127.0.0.1:46159 | 总线程数 1 |
仪表板: http://127.0.0.1:36825/status | 内存: 381.47 MiB |
Nanny: tcp://127.0.0.1:39639 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-icrn9gq_ |
工作节点:1
通信地址: tcp://127.0.0.1:44753 | 总线程数 1 |
仪表板: http://127.0.0.1:46199/status | 内存: 381.47 MiB |
Nanny: tcp://127.0.0.1:34707 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-ke1b20m1 |
工作节点:2
通信地址: tcp://127.0.0.1:44591 | 总线程数 1 |
仪表板: http://127.0.0.1:37637/status | 内存: 381.47 MiB |
Nanny: tcp://127.0.0.1:42199 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-t8f4iili |
工作节点:3
通信地址: tcp://127.0.0.1:41479 | 总线程数 1 |
仪表板: http://127.0.0.1:38249/status | 内存: 381.47 MiB |
Nanny: tcp://127.0.0.1:38963 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-qmr65by3 |
一个简单的工作负载¶
我们将一系列数字乘以二,添加一些休眠来模拟实际工作,然后通过求和来归约整个加倍后的数字序列。
[4]:
def multiply_by_two(x):
sleep(0.02)
return 2 * x
[5]:
N = 400
x = db.from_sequence(range(N), npartitions=N // 2)
mults = x.map(multiply_by_two)
summed = mults.sum()
突然关闭工作节点¶
让我们将两个工作进程 ID 标记为不可抢占。
[6]:
all_current_workers = [w.pid for w in cluster.scheduler.workers.values()]
non_preemptible_workers = all_current_workers[:2]
[7]:
def kill_a_worker():
preemptible_workers = [
w.pid for w in cluster.scheduler.workers.values()
if w.pid not in non_preemptible_workers]
if preemptible_workers:
os.kill(random.choice(preemptible_workers), 15)
启动计算并在运行时持续关闭工作节点¶
[8]:
summed = client.compute(summed)
while not summed.done():
kill_a_worker()
sleep(3.0)
2022-07-27 19:13:06,023 - distributed.nanny - WARNING - Restarting worker
检查结果是否匹配¶
[9]:
print(f"`sum(range({N}))` on cluster: {summed.result()}\t(should be {N * (N-1)})")
`sum(range(400))` on cluster: 159600 (should be 159600)