针对小数据问题的 Scikit-Learn 扩展
目录
在线 Notebook
您可以在在线会话中运行此 Notebook ,或在 Github 上查看。
针对小数据问题的 Scikit-Learn 扩展¶
此示例演示了 Dask 如何将 scikit-learn 扩展到机器集群以解决 CPU 密集型问题。我们将在一个小数据集上拟合一个大型模型,并在许多超参数上进行网格搜索。
此视频演示了在更大的集群上运行相同的示例。
[1]:
from IPython.display import YouTubeVideo
YouTubeVideo("5Zf6DQaf7jk")
[1]:
[2]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1, memory_limit='2GB')
client
[2]:
客户端
客户端-7bcf1e8b-0de1-11ed-a455-000d3a8f7959
连接方法: Cluster object | 集群类型: distributed.LocalCluster |
仪表盘: http://127.0.0.1:8787/status |
集群信息
LocalCluster
4980102c
仪表盘: http://127.0.0.1:8787/status | 工作节点 4 |
总线程数 4 | 总内存: 7.45 GiB |
状态: running | 使用进程: True |
调度器信息
调度器
调度器-89c11552-5d0d-4279-8917-35605cc96a56
通信: tcp://127.0.0.1:38375 | 工作节点 4 |
仪表盘: http://127.0.0.1:8787/status | 总线程数 4 |
启动时间: Just now | 总内存: 7.45 GiB |
工作节点
工作节点:0
通信: tcp://127.0.0.1:43429 | 总线程数 1 |
仪表盘: http://127.0.0.1:46693/status | 内存: 1.86 GiB |
Nanny: tcp://127.0.0.1:35293 | |
本地目录: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-gt1p0g05 |
工作节点:1
通信: tcp://127.0.0.1:46751 | 总线程数 1 |
仪表盘: http://127.0.0.1:35987/status | 内存: 1.86 GiB |
Nanny: tcp://127.0.0.1:40509 | |
本地目录: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-sq4p4isk |
工作节点:2
通信: tcp://127.0.0.1:35545 | 总线程数 1 |
仪表盘: http://127.0.0.1:46021/status | 内存: 1.86 GiB |
Nanny: tcp://127.0.0.1:38883 | |
本地目录: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-2iddzp9o |
工作节点:3
通信: tcp://127.0.0.1:43855 | 总线程数 1 |
仪表盘: http://127.0.0.1:37433/status | 内存: 1.86 GiB |
Nanny: tcp://127.0.0.1:32787 | |
本地目录: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-gfst4qpj |
分布式训练¶
Scikit-learn 使用 joblib 进行单机并行计算。这使得您可以使用笔记本电脑或工作站的所有核心来训练大多数评估器(任何接受 n_jobs
参数的评估器)。
或者,Scikit-Learn 可以使用 Dask 进行并行计算。这使得您可以使用集群的所有核心来训练这些评估器,而无需显著更改您的代码。
这对于在中等大小的数据集上训练大型模型最有用。当您搜索许多超参数时,或者当您使用包含许多个体评估器的集成方法时,您的模型可能会很大。对于过小的数据集,训练时间通常会很短,以至于集群范围的并行性没有太大帮助。对于过大的数据集(大于单机内存),scikit-learn 评估器可能无法处理(尽管 Dask-ML 提供了处理大于内存数据集的其他方法)。
创建 Scikit-Learn 流水线¶
[3]:
from pprint import pprint
from time import time
import logging
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
[4]:
# Scale Up: set categories=None to use all the categories
categories = [
'alt.atheism',
'talk.religion.misc',
]
print("Loading 20 newsgroups dataset for categories:")
print(categories)
data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()
Loading 20 newsgroups dataset for categories:
['alt.atheism', 'talk.religion.misc']
857 documents
2 categories
我们将定义一个小型流水线,它将文本特征提取与简单分类器结合起来。
[5]:
pipeline = Pipeline([
('vect', HashingVectorizer()),
('tfidf', TfidfTransformer()),
('clf', SGDClassifier(max_iter=1000)),
])
定义参数搜索网格¶
在某些参数上进行网格搜索。
[6]:
parameters = {
'tfidf__use_idf': (True, False),
'tfidf__norm': ('l1', 'l2'),
'clf__alpha': (0.00001, 0.000001),
# 'clf__penalty': ('l2', 'elasticnet'),
# 'clf__n_iter': (10, 50, 80),
}
[7]:
grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False)
通常情况下,我们会这样写来进行拟合
grid_search.fit(data.data, data.target)
这将使用默认的 joblib 后端(多进程)进行并行计算。要使用 Dask 分布式后端(它将使用机器集群来训练模型),请在 parallel_backend
上下文中执行拟合操作。
[8]:
import joblib
with joblib.parallel_backend('dask'):
grid_search.fit(data.data, data.target)
Fitting 3 folds for each of 8 candidates, totalling 24 fits
如果您在拟合期间打开了分布式仪表盘,您会注意到每个工作节点都执行了部分拟合任务。
并行、分布式预测¶
有时,您在一个小数据集上进行训练,但需要对更大批量的数据进行预测。在这种情况下,您希望您的评估器能够处理用于训练的 NumPy 数组和 pandas DataFrame,以及用于预测的 dask 数组或 DataFrame。dask_ml.wrappers.ParallelPostFit
https://ml.dask.org.cn/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit__ 正提供了这种能力。它是一个元评估器。它在训练期间不做任何事情;底层评估器(可能是 scikit-learn 评估器)可能会在单机内存中。但是像 predict
、score
等任务是并行和分布式的。
大多数时候,使用 ParallelPostFit
就像包装原始评估器一样简单。当在 GridSearch 内部使用时,您需要更新参数的键,就像使用任何元评估器一样。唯一的复杂之处在于将 ParallelPostFit
与另一个元评估器(如 GridSearchCV
)一起使用。在这种情况下,您需要为参数名称加上前缀 estimator__
。
[9]:
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from dask_ml.wrappers import ParallelPostFit
我们将加载用于训练的小型 NumPy 数组。
[10]:
X, y = load_digits(return_X_y=True)
X.shape
[10]:
(1797, 64)
[11]:
svc = ParallelPostFit(SVC(random_state=0, gamma='scale'))
param_grid = {
# use estimator__param instead of param
'estimator__C': [0.01, 1.0, 10],
}
grid_search = GridSearchCV(svc, param_grid, cv=3)
并像往常一样进行拟合。
[12]:
grid_search.fit(X, y)
[12]:
GridSearchCV(cv=3, estimator=ParallelPostFit(estimator=SVC(random_state=0)),
param_grid={'estimator__C': [0.01, 1.0, 10]})
我们将通过多次复制训练数据来模拟一个大型 dask 数组。实际上,您会从文件系统中加载这些数据。
[13]:
import dask.array as da
[14]:
big_X = da.concatenate([
da.from_array(X, chunks=X.shape)
for _ in range(10)
])
big_X
[14]:
|
像 predict
或 predict_proba
这样的操作返回的是 dask 数组,而不是 NumPy 数组。当您计算时,工作将并行执行,可以在核外或在集群上分布式执行。
[15]:
predicted = grid_search.predict(big_X)
predicted
[15]:
|
此时,预测结果可以写入磁盘,或者在返回客户端之前进行聚合。