实时 Notebook

您可以在实时会话中运行此 Notebook Binder 或在Github 上查看

b31daa0c7cd64076b553c0869e7eeb5e

异步计算:Web 服务器 + Dask

让我们设想一个简单的 Web 服务器,它既服务快速加载的页面,也对加载较慢的页面执行一些计算。在我们的例子中,这将是一个简单的斐波那契数列服务应用,但您可以想象将 fib 函数替换为在某些输入数据上运行机器学习模型、从数据库获取结果等等。

[1]:
import tornado.ioloop
import tornado.web

def fib(n):
    if n < 2:
        return n
    else:
        return fib(n - 1) + fib(n - 2)


class FibHandler(tornado.web.RequestHandler):
    def get(self, n):
        result = fib(int(n))
        self.write(str(result))


class FastHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello!")


def make_app():
    return tornado.web.Application([
        (r"/fast", FastHandler),
        (r"/fib/(\d+)", FibHandler),
    ])


app = make_app()
app.listen(8000)
[1]:
<tornado.httpserver.HTTPServer at 0x7f82ccab3040>

速度

我们知道用户会将快速响应时间与权威内容和信任联系起来,因此我们想衡量我们的页面加载速度。我们特别有兴趣在许多同时加载的情况下进行此操作,模拟我们的 Web 服务器在为许多用户提供服务时的响应情况

[2]:
import asyncio
import tornado.httpclient

client = tornado.httpclient.AsyncHTTPClient()

from time import time

async def measure(url, n=100):
    """ Get url n times concurrently.  Print duration. """
    start = time()
    futures = [client.fetch(url) for i in range(n)]
    results = await asyncio.gather(*futures)
    end = time()
    print(url, ', %d simultaneous requests, ' %  n, 'total time: ', (end - start))

计时

我们看到

  1. Tornado 的往返时间约为 3-5 毫秒

  2. 它可以在大约 100 毫秒内运行 100 个这样的查询,因此发生了一些不错的并发

  3. 调用 fib 需要一段时间

  4. 调用 fib 100 次大约需要 100 倍的时间,并行性不是很高

[3]:
await measure('http://localhost:8000/fast', n=1)
http://localhost:8000/fast , 1 simultaneous requests,  total time:  0.005836009979248047
[4]:
await measure('http://localhost:8000/fast', n=100)
http://localhost:8000/fast , 100 simultaneous requests,  total time:  0.13756465911865234
[5]:
await measure('http://localhost:8000/fib/28', n=1)
http://localhost:8000/fib/28 , 1 simultaneous requests,  total time:  0.11277055740356445
[6]:
await measure('http://localhost:8000/fib/28', n=100)
http://localhost:8000/fib/28 , 100 simultaneous requests,  total time:  11.173585653305054

阻塞式异步

在下面的示例中,我们看到对慢速 fib/ 路由的一次调用不幸地会阻塞其他许多更快的请求

[7]:
a = asyncio.ensure_future(measure('http://localhost:8000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:8000/fast', n=1))
await b
http://localhost:8000/fib/35 , 1 simultaneous requests,  total time:  3.2470662593841553
http://localhost:8000/fast , 1 simultaneous requests,  total time:  3.246363639831543

讨论

这里有两个问题/机会

  1. 我们所有的 fib 调用都是独立的,我们希望使用多个核心或附近的集群并行运行这些计算。

  2. 我们的慢速计算密集型 fib 请求可能会妨碍我们的快速请求。一个慢速用户会影响其他人。

使用 Dask 进行进程外异步计算

为了解决这两个问题,我们将使用 Dask 将计算卸载到其他进程或计算机。由于 Dask 是一个异步框架,它可以很好地与 Tornado 或 Asyncio 集成。

[8]:
from dask.distributed import Client

dask_client = await Client(asynchronous=True)  # use local processes for now
dask_client
[8]:

客户端

Client-62e54cd7-0de0-11ed-9dd9-000d3a8f7959

连接方法: Cluster object 集群类型: distributed.LocalCluster
仪表板: http://127.0.0.1:8787/status

集群信息

[9]:

def fib(n): if n < 2: return n else: return fib(n - 1) + fib(n - 2) class FibHandler(tornado.web.RequestHandler): async def get(self, n): future = dask_client.submit(fib, int(n)) # submit work to happen elsewhere result = await future self.write(str(result)) class MainHandler(tornado.web.RequestHandler): async def get(self): self.write("Hello, world") def make_app(): return tornado.web.Application([ (r"/fast", MainHandler), (r"/fib/(\d+)", FibHandler), ]) app = make_app() app.listen(9000)
[9]:
<tornado.httpserver.HTTPServer at 0x7f82ccab3070>

性能变化

通过将 fib 计算卸载到 Dask,我们实现了两件事

并行计算

我们现在可以在更短的时间内支持更多的请求。下面的实验同时向 20 个请求询问 fib(28)。在旧版本中,我们花费几秒钟按顺序计算这些(最后一个请求的人需要等待几秒钟,直到他们的浏览器完成)。在新版本中,其中许多可以并行计算,因此每个人都在几百毫秒内得到答案。

[10]:
# Before parallelism
await measure('http://localhost:8000/fib/28', n=20)
http://localhost:8000/fib/28 , 20 simultaneous requests,  total time:  2.282106399536133
[11]:
# After parallelism
await measure('http://localhost:9000/fib/28', n=20)
http://localhost:9000/fib/28 , 20 simultaneous requests,  total time:  0.44431519508361816

异步计算

以前,当一个请求忙于计算 fib(...) 时,Tornado 会被阻塞。它无法处理任何其他请求。当我们的服务器同时提供昂贵的计算和廉价的计算时,这尤其成问题。廉价请求会被不必要地挂起。

由于 Dask 能够与 Tornado 或 Asyncio 等异步系统集成,我们的 Web 服务器可以自由地在许多请求之间跳转,即使计算在后台进行。在下面的示例中,我们看到即使慢速计算先开始,快速计算也只用了几毫秒就返回了结果。

[12]:
# Before async
a = asyncio.ensure_future(measure('http://localhost:8000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:8000/fast', n=1))
await b
await a
http://localhost:8000/fib/35 , 1 simultaneous requests,  total time:  3.244072437286377
http://localhost:8000/fast , 1 simultaneous requests,  total time:  3.244060516357422
[13]:
# After async
a = asyncio.ensure_future(measure('http://localhost:9000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:9000/fast', n=1))
await b
await a
http://localhost:9000/fast , 1 simultaneous requests,  total time:  0.009461402893066406
http://localhost:9000/fib/35 , 1 simultaneous requests,  total time:  3.4255080223083496

其他选项

在这些情况下,人们现在倾向于使用 concurrent.futuresCelery

  • concurrent.futures 允许在单台机器上轻松实现并行性,并能很好地集成到异步框架中。其 API 正是我们上面展示的(Dask 实现了 concurrent.futures API)。然而,concurrent.futures 不容易扩展到集群。

  • Celery 更容易扩展到多台机器,但延迟更高,向下扩展不够好,并且需要一些努力才能集成到异步框架中(或者至少这是我的理解,我在这方面的经验比较浅)

在这种情况下,Dask 提供了两者的部分优势。它在常见的单机场景下易于设置和使用,但也可以扩展到集群。它可以很好地与异步框架集成,并且只增加非常小的延迟。

[14]:
async def f():
    start = time()
    result = await dask_client.submit(lambda x: x + 1, 10)
    end = time()
    print('Roundtrip latency: %.2f ms' % ((end - start) * 1000))

await f()
Roundtrip latency: 13.69 ms