Async/Await 和非阻塞执行

在线 Notebook

您可以在 在线会话 中运行此 notebook Binder在 Github 上 查看。

Async/Await 和非阻塞执行

Dask 使用 TornadoAsyncio 框架与并发应用程序原生集成,并且可以使用 Python 的 asyncawait 关键字。

此示例展示了如何在异步模式下启动 Dask Client 的一个小例子。

asynchronous=True 参数

如果您传递 asynchronous=True 参数,Dask LocalCluster 和 Client 对象可以在 async-await 模式下运行。

[1]:
from dask.distributed import Client
client = await Client(asynchronous=True)
[2]:
def inc(x: int) -> int:
    return x + 1

future = client.submit(inc, 10)
future
[2]:
Future: inc 状态: 待处理, 类型: NoneType, 键: inc-cf183ed345225f36fe30323c57a93811
[3]:
await future
[3]:
11

集合

请注意,像 .compute() 方法这样的阻塞操作不适用于异步模式。您需要改用 Client.compute 方法。

[4]:
import dask
df = dask.datasets.timeseries()
df
[4]:
Dask DataFrame 结构
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: make-timeseries, 30 个任务
[5]:
df = df.persist()             # persist is non-blocking, so it's ok
[6]:
total = df[['x', 'y']].sum()  # lazy computations are also ok
[7]:
# total.compute()             # but compute is bad, because compute blocks until done
[8]:
future = client.compute(total)
future
[8]:
Future: finalize 状态: 待处理, 类型: NoneType, 键: finalize-c005c96bb73e558555c311bfabfc6ba2
[9]:
await future
[9]:
x    239.172407
y    762.941155
dtype: float64

在脚本中

在 Jupyter 中运行 async/await 代码有点特殊。Jupyter 已经运行着一个事件循环,因此可以直接在其中轻松使用 async/await 语法。但在正常的 Python 脚本中并非如此。这是一个示例脚本,它应该在正常的 Python 解释器中或作为脚本运行。

import asyncio
from dask.distributed import Client


def inc(x: int) -> int:
    return x + 1


async def f():
    async with Client(asynchronous=True) as client:
        future = client.submit(inc, 10)
        result = await future
        print(result)


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(f())