在线 Notebook

您可以在在线会话中运行此 Notebook Binder,或在 Github 上查看

针对小数据问题的 Scikit-Learn 扩展

此示例演示了 Dask 如何将 scikit-learn 扩展到机器集群以解决 CPU 密集型问题。我们将在一个小数据集上拟合一个大型模型,并在许多超参数上进行网格搜索。

此视频演示了在更大的集群上运行相同的示例。

[1]:
from IPython.display import YouTubeVideo

YouTubeVideo("5Zf6DQaf7jk")
[1]:
[2]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1, memory_limit='2GB')
client
[2]:

客户端

客户端-7bcf1e8b-0de1-11ed-a455-000d3a8f7959

连接方法: Cluster object 集群类型: distributed.LocalCluster
仪表盘: http://127.0.0.1:8787/status

集群信息

分布式训练

f1aeecbb269b4feba6c9dd10e300ed38 709cf32990b14a4d8131246bdba79bb8

Scikit-learn 使用 joblib 进行单机并行计算。这使得您可以使用笔记本电脑或工作站的所有核心来训练大多数评估器(任何接受 n_jobs 参数的评估器)。

或者,Scikit-Learn 可以使用 Dask 进行并行计算。这使得您可以使用集群的所有核心来训练这些评估器,而无需显著更改您的代码。

这对于在中等大小的数据集上训练大型模型最有用。当您搜索许多超参数时,或者当您使用包含许多个体评估器的集成方法时,您的模型可能会很大。对于过小的数据集,训练时间通常会很短,以至于集群范围的并行性没有太大帮助。对于过大的数据集(大于单机内存),scikit-learn 评估器可能无法处理(尽管 Dask-ML 提供了处理大于内存数据集的其他方法)。

创建 Scikit-Learn 流水线

[3]:
from pprint import pprint
from time import time
import logging

from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
[4]:
# Scale Up: set categories=None to use all the categories
categories = [
    'alt.atheism',
    'talk.religion.misc',
]

print("Loading 20 newsgroups dataset for categories:")
print(categories)

data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()
Loading 20 newsgroups dataset for categories:
['alt.atheism', 'talk.religion.misc']
857 documents
2 categories

我们将定义一个小型流水线,它将文本特征提取与简单分类器结合起来。

[5]:
pipeline = Pipeline([
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', SGDClassifier(max_iter=1000)),
])

并行、分布式预测

有时,您在一个小数据集上进行训练,但需要对更大批量的数据进行预测。在这种情况下,您希望您的评估器能够处理用于训练的 NumPy 数组和 pandas DataFrame,以及用于预测的 dask 数组或 DataFrame。dask_ml.wrappers.ParallelPostFit https://ml.dask.org.cn/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit__ 正提供了这种能力。它是一个元评估器。它在训练期间不做任何事情;底层评估器(可能是 scikit-learn 评估器)可能会在单机内存中。但是像 predictscore 等任务是并行和分布式的。

大多数时候,使用 ParallelPostFit 就像包装原始评估器一样简单。当在 GridSearch 内部使用时,您需要更新参数的键,就像使用任何元评估器一样。唯一的复杂之处在于将 ParallelPostFit 与另一个元评估器(如 GridSearchCV)一起使用。在这种情况下,您需要为参数名称加上前缀 estimator__

[9]:
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from dask_ml.wrappers import ParallelPostFit

我们将加载用于训练的小型 NumPy 数组。

[10]:
X, y = load_digits(return_X_y=True)
X.shape
[10]:
(1797, 64)
[11]:
svc = ParallelPostFit(SVC(random_state=0, gamma='scale'))

param_grid = {
    # use estimator__param instead of param
    'estimator__C': [0.01, 1.0, 10],
}

grid_search = GridSearchCV(svc, param_grid, cv=3)

并像往常一样进行拟合。

[12]:
grid_search.fit(X, y)
[12]:
GridSearchCV(cv=3, estimator=ParallelPostFit(estimator=SVC(random_state=0)),
             param_grid={'estimator__C': [0.01, 1.0, 10]})

我们将通过多次复制训练数据来模拟一个大型 dask 数组。实际上,您会从文件系统中加载这些数据。

[13]:
import dask.array as da
[14]:
big_X = da.concatenate([
    da.from_array(X, chunks=X.shape)
    for _ in range(10)
])
big_X
[14]:
数组 分块
字节 8.77 MiB 898.50 kiB
形状 (17970, 64) (1797, 64)
计数 11 个任务 10 个分块
类型 float64 numpy.ndarray
64 17970

predictpredict_proba 这样的操作返回的是 dask 数组,而不是 NumPy 数组。当您计算时,工作将并行执行,可以在核外或在集群上分布式执行。

[15]:
predicted = grid_search.predict(big_X)
predicted
[15]:
数组 分块
字节 140.39 kiB 14.04 kiB
形状 (17970,) (1797,)
计数 21 个任务 10 个分块
类型 int64 numpy.ndarray
17970 1

此时,预测结果可以写入磁盘,或者在返回客户端之前进行聚合。