异步计算:Web 服务器 + Dask
目录
实时 Notebook
您可以在实时会话中运行此 Notebook 或在Github 上查看。
异步计算:Web 服务器 + Dask¶
让我们设想一个简单的 Web 服务器,它既服务快速加载的页面,也对加载较慢的页面执行一些计算。在我们的例子中,这将是一个简单的斐波那契数列服务应用,但您可以想象将 fib
函数替换为在某些输入数据上运行机器学习模型、从数据库获取结果等等。
[1]:
import tornado.ioloop
import tornado.web
def fib(n):
if n < 2:
return n
else:
return fib(n - 1) + fib(n - 2)
class FibHandler(tornado.web.RequestHandler):
def get(self, n):
result = fib(int(n))
self.write(str(result))
class FastHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello!")
def make_app():
return tornado.web.Application([
(r"/fast", FastHandler),
(r"/fib/(\d+)", FibHandler),
])
app = make_app()
app.listen(8000)
[1]:
<tornado.httpserver.HTTPServer at 0x7f82ccab3040>
速度¶
我们知道用户会将快速响应时间与权威内容和信任联系起来,因此我们想衡量我们的页面加载速度。我们特别有兴趣在许多同时加载的情况下进行此操作,模拟我们的 Web 服务器在为许多用户提供服务时的响应情况
[2]:
import asyncio
import tornado.httpclient
client = tornado.httpclient.AsyncHTTPClient()
from time import time
async def measure(url, n=100):
""" Get url n times concurrently. Print duration. """
start = time()
futures = [client.fetch(url) for i in range(n)]
results = await asyncio.gather(*futures)
end = time()
print(url, ', %d simultaneous requests, ' % n, 'total time: ', (end - start))
计时¶
我们看到
Tornado 的往返时间约为 3-5 毫秒
它可以在大约 100 毫秒内运行 100 个这样的查询,因此发生了一些不错的并发
调用 fib 需要一段时间
调用 fib 100 次大约需要 100 倍的时间,并行性不是很高
[3]:
await measure('http://localhost:8000/fast', n=1)
http://localhost:8000/fast , 1 simultaneous requests, total time: 0.005836009979248047
[4]:
await measure('http://localhost:8000/fast', n=100)
http://localhost:8000/fast , 100 simultaneous requests, total time: 0.13756465911865234
[5]:
await measure('http://localhost:8000/fib/28', n=1)
http://localhost:8000/fib/28 , 1 simultaneous requests, total time: 0.11277055740356445
[6]:
await measure('http://localhost:8000/fib/28', n=100)
http://localhost:8000/fib/28 , 100 simultaneous requests, total time: 11.173585653305054
阻塞式异步¶
在下面的示例中,我们看到对慢速 fib/
路由的一次调用不幸地会阻塞其他许多更快的请求
[7]:
a = asyncio.ensure_future(measure('http://localhost:8000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:8000/fast', n=1))
await b
http://localhost:8000/fib/35 , 1 simultaneous requests, total time: 3.2470662593841553
http://localhost:8000/fast , 1 simultaneous requests, total time: 3.246363639831543
讨论¶
这里有两个问题/机会
我们所有的
fib
调用都是独立的,我们希望使用多个核心或附近的集群并行运行这些计算。我们的慢速计算密集型
fib
请求可能会妨碍我们的快速请求。一个慢速用户会影响其他人。
使用 Dask 进行进程外异步计算¶
为了解决这两个问题,我们将使用 Dask 将计算卸载到其他进程或计算机。由于 Dask 是一个异步框架,它可以很好地与 Tornado 或 Asyncio 集成。
[8]:
from dask.distributed import Client
dask_client = await Client(asynchronous=True) # use local processes for now
dask_client
[8]:
客户端
Client-62e54cd7-0de0-11ed-9dd9-000d3a8f7959
连接方法: Cluster object | 集群类型: distributed.LocalCluster |
仪表板: http://127.0.0.1:8787/status |
集群信息
LocalCluster
aada52c9
仪表板: http://127.0.0.1:8787/status | 工作节点 2 |
总线程数 2 | 总内存: 6.78 GiB |
状态: 运行中 | 使用进程: True |
调度器信息
调度器
Scheduler-f94995f6-d115-4423-8a61-bc03a4ba55f2
通讯: tcp://127.0.0.1:42751 | 工作节点 2 |
仪表板: http://127.0.0.1:8787/status | 总线程数 2 |
启动于: 刚刚 | 总内存: 6.78 GiB |
工作节点
工作节点:0
通讯: tcp://127.0.0.1:39891 | 总线程数 1 |
仪表板: http://127.0.0.1:43005/status | 内存: 3.39 GiB |
Nanny: tcp://127.0.0.1:46657 | |
本地目录: /home/runner/work/dask-examples/dask-examples/applications/dask-worker-space/worker-589y0ffl |
工作节点:1
通讯: tcp://127.0.0.1:35639 | 总线程数 1 |
仪表板: http://127.0.0.1:33063/status | 内存: 3.39 GiB |
Nanny: tcp://127.0.0.1:43917 | |
本地目录: /home/runner/work/dask-examples/dask-examples/applications/dask-worker-space/worker-27symtet |
[9]:
def fib(n):
if n < 2:
return n
else:
return fib(n - 1) + fib(n - 2)
class FibHandler(tornado.web.RequestHandler):
async def get(self, n):
future = dask_client.submit(fib, int(n)) # submit work to happen elsewhere
result = await future
self.write(str(result))
class MainHandler(tornado.web.RequestHandler):
async def get(self):
self.write("Hello, world")
def make_app():
return tornado.web.Application([
(r"/fast", MainHandler),
(r"/fib/(\d+)", FibHandler),
])
app = make_app()
app.listen(9000)
[9]:
<tornado.httpserver.HTTPServer at 0x7f82ccab3070>
性能变化¶
通过将 fib 计算卸载到 Dask,我们实现了两件事
并行计算¶
我们现在可以在更短的时间内支持更多的请求。下面的实验同时向 20 个请求询问 fib(28)
。在旧版本中,我们花费几秒钟按顺序计算这些(最后一个请求的人需要等待几秒钟,直到他们的浏览器完成)。在新版本中,其中许多可以并行计算,因此每个人都在几百毫秒内得到答案。
[10]:
# Before parallelism
await measure('http://localhost:8000/fib/28', n=20)
http://localhost:8000/fib/28 , 20 simultaneous requests, total time: 2.282106399536133
[11]:
# After parallelism
await measure('http://localhost:9000/fib/28', n=20)
http://localhost:9000/fib/28 , 20 simultaneous requests, total time: 0.44431519508361816
异步计算¶
以前,当一个请求忙于计算 fib(...)
时,Tornado 会被阻塞。它无法处理任何其他请求。当我们的服务器同时提供昂贵的计算和廉价的计算时,这尤其成问题。廉价请求会被不必要地挂起。
由于 Dask 能够与 Tornado 或 Asyncio 等异步系统集成,我们的 Web 服务器可以自由地在许多请求之间跳转,即使计算在后台进行。在下面的示例中,我们看到即使慢速计算先开始,快速计算也只用了几毫秒就返回了结果。
[12]:
# Before async
a = asyncio.ensure_future(measure('http://localhost:8000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:8000/fast', n=1))
await b
await a
http://localhost:8000/fib/35 , 1 simultaneous requests, total time: 3.244072437286377
http://localhost:8000/fast , 1 simultaneous requests, total time: 3.244060516357422
[13]:
# After async
a = asyncio.ensure_future(measure('http://localhost:9000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:9000/fast', n=1))
await b
await a
http://localhost:9000/fast , 1 simultaneous requests, total time: 0.009461402893066406
http://localhost:9000/fib/35 , 1 simultaneous requests, total time: 3.4255080223083496
其他选项¶
在这些情况下,人们现在倾向于使用 concurrent.futures 或 Celery。
concurrent.futures 允许在单台机器上轻松实现并行性,并能很好地集成到异步框架中。其 API 正是我们上面展示的(Dask 实现了 concurrent.futures API)。然而,concurrent.futures 不容易扩展到集群。
Celery 更容易扩展到多台机器,但延迟更高,向下扩展不够好,并且需要一些努力才能集成到异步框架中(或者至少这是我的理解,我在这方面的经验比较浅)
在这种情况下,Dask 提供了两者的部分优势。它在常见的单机场景下易于设置和使用,但也可以扩展到集群。它可以很好地与异步框架集成,并且只增加非常小的延迟。
[14]:
async def f():
start = time()
result = await dask_client.submit(lambda x: x + 1, 10)
end = time()
print('Roundtrip latency: %.2f ms' % ((end - start) * 1000))
await f()
Roundtrip latency: 13.69 ms