Async/Await 和非阻塞执行
目录
在线 Notebook
您可以在 在线会话 中运行此 notebook 或 在 Github 上 查看。
Async/Await 和非阻塞执行¶
Dask 使用 Tornado 或 Asyncio 框架与并发应用程序原生集成,并且可以使用 Python 的 async
和 await
关键字。
此示例展示了如何在异步模式下启动 Dask Client 的一个小例子。
asynchronous=True
参数¶
如果您传递 asynchronous=True
参数,Dask LocalCluster 和 Client 对象可以在 async-await 模式下运行。
[1]:
from dask.distributed import Client
client = await Client(asynchronous=True)
[2]:
def inc(x: int) -> int:
return x + 1
future = client.submit(inc, 10)
future
[2]:
Future: inc 状态: 待处理, 类型: NoneType, 键: inc-cf183ed345225f36fe30323c57a93811
[3]:
await future
[3]:
11
集合¶
请注意,像 .compute()
方法这样的阻塞操作不适用于异步模式。您需要改用 Client.compute
方法。
[4]:
import dask
df = dask.datasets.timeseries()
df
[4]:
Dask DataFrame 结构
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | int64 | object | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
Dask Name: make-timeseries, 30 个任务
[5]:
df = df.persist() # persist is non-blocking, so it's ok
[6]:
total = df[['x', 'y']].sum() # lazy computations are also ok
[7]:
# total.compute() # but compute is bad, because compute blocks until done
[8]:
future = client.compute(total)
future
[8]:
Future: finalize 状态: 待处理, 类型: NoneType, 键: finalize-c005c96bb73e558555c311bfabfc6ba2
[9]:
await future
[9]:
x 239.172407
y 762.941155
dtype: float64
在脚本中¶
在 Jupyter 中运行 async/await 代码有点特殊。Jupyter 已经运行着一个事件循环,因此可以直接在其中轻松使用 async/await 语法。但在正常的 Python 脚本中并非如此。这是一个示例脚本,它应该在正常的 Python 解释器中或作为脚本运行。
import asyncio
from dask.distributed import Client
def inc(x: int) -> int:
return x + 1
async def f():
async with Client(asynchronous=True) as client:
future = client.submit(inc, 10)
result = await future
print(result)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(f())