在线运行 Notebook

您可以在在线会话中运行此 notebook Binder,或者在 Github 上查看

使用 Numba 进行模板计算

Numba Logo

本 notebook 将高性能 Python 编译器 Numba 与 Dask 数组结合使用。

我们将重点展示 Numba 的两个特性以及它们如何与 Dask 集成:

  1. Numba 的stencil 装饰器

  2. NumPy 的广义通用函数

这最初作为一篇博文发布在此处

Numba Stencils 简介

许多数组计算函数仅在数组的局部区域上操作。这在图像处理、信号处理、模拟、微分方程求解、异常检测、时间序列分析等领域很常见。通常我们编写的代码如下所示:

[1]:
def _smooth(x):
    out = np.empty_like(x)
    for i in range(1, x.shape[0] - 1):
        for j in range(1, x.shape[1] - 1):
            out[i, j] = (x[i + -1, j + -1] + x[i + -1, j + 0] + x[i + -1, j + 1] +
                         x[i +  0, j + -1] + x[i +  0, j + 0] + x[i +  0, j + 1] +
                         x[i +  1, j + -1] + x[i +  1, j + 0] + x[i +  1, j + 1]) // 9

    return out

或者类似的东西。numba.stencil 装饰器使这更容易编写。您只需写下每个元素会发生什么,Numba 会处理其余部分。

[2]:
import numba

@numba.stencil
def _smooth(x):
    return (x[-1, -1] + x[-1, 0] + x[-1, 1] +
            x[ 0, -1] + x[ 0, 0] + x[ 0, 1] +
            x[ 1, -1] + x[ 1, 0] + x[ 1, 1]) // 9

当我们在 NumPy 数组上运行此函数时,我们发现它很慢,以 Python 的速度运行。

[3]:
import numpy as np
x = np.ones((100, 100))

%timeit _smooth(x)
368 ms ± 1.85 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

但如果使用 Numba 对此函数进行 JIT 编译,它就会运行得更快。

[4]:
@numba.njit
def smooth(x):
    return _smooth(x)

%timeit smooth(x)
175 µs ± 25.4 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

对于那些计算过的人来说,这快了 1000 倍以上!

注意:此函数已存在于 ``scipy.ndimage.uniform_filter`` 中,其运行速度相同。

Dask 数组

在这些应用中,人们通常有许多这样的数组,并且希望将此函数应用于所有数组。原则上,他们可以使用 for 循环来完成此操作。

from glob import glob
import skimage.io

for fn in glob('/path/to/*.png'):
    img = skimage.io.imread(fn)
    out = smooth(img)
    skimage.io.imsave(fn.replace('.png', '.out.png'), out)

如果他们想并行执行此操作,可以使用 multiprocessing 或 concurrent.futures 模块。如果想在集群上执行此操作,则可以使用 PySpark 或其他一些系统重写代码。

或者,他们可以使用 Dask 数组,它将处理管道化和并行化(单机或集群),同时仍然看起来大部分像一个 NumPy 数组。

import dask_image
x = dask_image.imread('/path/to/*.png')  # a large lazy array of all of our images
y = x.map_blocks(smooth, dtype='int8')

然后,由于 Dask 数组的每个块都只是 NumPy 数组,我们可以使用map_blocks函数将此函数应用于所有图像,然后将其保存。

这很好,但让我们更进一步,讨论 NumPy 的广义通用函数。

因为我们手边没有图像堆栈,我们将创建一个具有类似结构的随机数组。

[5]:
import dask.array as da
x = da.random.randint(0, 127, size=(10000, 1000, 1000), chunks=('64 MB', None, None), dtype='int8')
x
[5]:
数组
字节 9.31 GiB 47.68 MiB
形状 (10000, 1000, 1000) (50, 1000, 1000)
数量 200 个任务 200 个块
类型 int8 numpy.ndarray
1000 1000 10000

广义通用函数

Numba 文档: https://numba.pydata.org/numba-doc/dev/user/vectorize.html

NumPy 文档: https://numpy.com.cn/doc/stable/reference/c-api/generalized-ufuncs.html

广义通用函数(gufunc)是一个用类型和维度信息进行注解的普通函数。例如,我们可以将 smooth 函数重新定义为 gufunc,如下所示:

[6]:
@numba.guvectorize(
    [(numba.int8[:, :], numba.int8[:, :])],
    '(n, m) -> (n, m)'
)
def smooth(x, out):
    out[:] = _smooth(x)

此函数知道它接收一个二维 int8 数组,并生成一个具有相同维度的二维 int8 数组。

这种注解是一个小改变,但它为 Dask 等其他系统提供了足够的信息以智能地使用它。我们不再调用像 map_blocks 这样的函数,而是可以直接使用该函数,就像我们的 Dask 数组只是一个非常大的 NumPy 数组一样。

[7]:
# Before gufuncs
y = x.map_blocks(smooth, dtype='int8')

# After gufuncs
y = smooth(x)
y
[7]:
数组
字节 9.31 GiB 47.68 MiB
形状 (10000, 1000, 1000) (50, 1000, 1000)
数量 800 个任务 200 个块
类型 int8 numpy.ndarray
1000 1000 10000

这很好。如果您使用 gufunc 语义编写库代码,那么该代码就可以直接与 Dask 等系统配合使用,而无需内置对并行计算的显式支持。这极大地简化了用户的工作。

启动 Dask 客户端以查看仪表盘

启动 Dask 客户端是可选的。它将启动仪表盘,这对于了解计算详情很有用。

[8]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4,
                n_workers=1,
                processes=False,
                memory_limit='4GB')
client
[8]:

客户端

客户端-bb5807ea-0de0-11ed-a0f1-000d3a8f7959

连接方法: 集群对象 集群类型: distributed.LocalCluster
仪表盘: http://10.1.1.64:8787/status

集群信息

[9]:
y.max().compute()
[9]:
122

GPU 版本

Numba 还支持在兼容的 GPU 设备上使用 CUDA 进行 JIT 编译。

使用 numba.cuda.jit,这在单块 V100 GPU 上比 CPU 提速约 200 倍。

import numba.cuda

@numba.cuda.jit
def smooth_gpu(x, out):
    i, j = cuda.grid(2)
    n, m = x.shape
    if 1 <= i < n - 1 and 1 <= j < m - 1:
        out[i, j] = (x[i - 1, j - 1] + x[i - 1, j] + x[i - 1, j + 1] +
                     x[i    , j - 1] + x[i    , j] + x[i    , j + 1] +
                     x[i + 1, j - 1] + x[i + 1, j] + x[i + 1, j + 1]) // 9

import cupy, math

x_gpu = cupy.ones((10000, 10000), dtype='int8')
out_gpu = cupy.zeros((10000, 10000), dtype='int8')

# I copied the four lines below from the Numba docs
threadsperblock = (16, 16)
blockspergrid_x = math.ceil(x_gpu.shape[0] / threadsperblock[0])
blockspergrid_y = math.ceil(x_gpu.shape[1] / threadsperblock[1])
blockspergrid = (blockspergrid_x, blockspergrid_y)

smooth_gpu[blockspergrid, threadsperblock](x_gpu, out_gpu)

完整 notebook 在此处