逐步训练大型数据集
目录
实时 Notebook
你可以在实时会话中运行此 Notebook 或在 Github 上查看。
逐步训练大型数据集¶
我们可以一次训练一个批次的大型数据集上的模型。许多 Scikit-Learn 估计器实现了 partial_fit
方法,以支持批量增量学习。
est = SGDClassifier(...)
est.partial_fit(X_train_1, y_train_1)
est.partial_fit(X_train_2, y_train_2)
...
Scikit-Learn 文档在其用户指南中更深入地讨论了这种方法。
本 Notebook 演示了 Dask-ML 的 Incremental
元估计器的用法,它自动化了在 Dask 数组和 DataFrame 上使用 Scikit-Learn 的 partial_fit
的过程。Scikit-Learn 处理所有计算,而 Dask 根据需要处理数据管理、加载和移动数据批次。这使得可以在许多机器上分布式处理大型数据集,或处理无法完全放入内存的数据集,所有这些都通过熟悉的工作流程进行。
此示例展示了...
使用 Dask-ML Incremental 元估计器包装实现
partial_fit
的 Scikit-Learn 估计器在此包装的估计器上进行训练、预测和评分
尽管此示例使用了 Scikit-Learn 的 SGDClassifier,但 Incremental
元估计器适用于任何实现 partial_fit
方法和 scikit-learn 基估计器 API 的类。
设置 Dask¶
我们首先启动一个 Dask 客户端,以访问 Dask 仪表盘,它将提供进度和性能指标。
运行单元格后,您可以点击仪表盘链接查看仪表盘
[1]:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1)
client
[1]:
客户端
Client-6fe04c8e-0de1-11ed-a3a3-000d3a8f7959
连接方法: Cluster 对象 | 集群类型: distributed.LocalCluster |
仪表盘: http://127.0.0.1:8787/status |
集群信息
LocalCluster
85c11ebe
仪表盘: http://127.0.0.1:8787/status | 工作进程 4 |
总线程数 4 | 总内存: 6.78 GiB |
状态: 运行中 | 使用进程: True |
调度器信息
调度器
Scheduler-55b9e8e1-2630-4f2c-b9a8-94f5f1e77a1c
通信地址: tcp://127.0.0.1:38373 | 工作进程 4 |
仪表盘: http://127.0.0.1:8787/status | 总线程数 4 |
启动时间: 刚刚 | 总内存: 6.78 GiB |
工作进程
工作进程:0
通信地址: tcp://127.0.0.1:43257 | 总线程数 1 |
仪表盘: http://127.0.0.1:43969/status | 内存: 1.70 GiB |
Nanny 地址: tcp://127.0.0.1:43431 | |
本地目录: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-pf961cd2 |
工作进程:1
通信地址: tcp://127.0.0.1:35171 | 总线程数 1 |
仪表盘: http://127.0.0.1:43051/status | 内存: 1.70 GiB |
Nanny 地址: tcp://127.0.0.1:33129 | |
本地目录: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-g873798m |
工作进程:2
通信地址: tcp://127.0.0.1:40821 | 总线程数 1 |
仪表盘: http://127.0.0.1:44433/status | 内存: 1.70 GiB |
Nanny 地址: tcp://127.0.0.1:44935 | |
本地目录: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-_7hesy4q |
工作进程:3
通信地址: tcp://127.0.0.1:37923 | 总线程数 1 |
仪表盘: http://127.0.0.1:43331/status | 内存: 1.70 GiB |
Nanny 地址: tcp://127.0.0.1:33679 | |
本地目录: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-pxcnehmb |
创建数据¶
我们创建一个合成数据集,它足够大以引起兴趣,但又足够小以便快速运行。
我们的数据集有 1,000,000 个样本和 100 个特征。
[2]:
import dask
import dask.array as da
from dask_ml.datasets import make_classification
n, d = 100000, 100
X, y = make_classification(n_samples=n, n_features=d,
chunks=n // 10, flip_y=0.2)
X
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:1283: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
warnings.warn(
[2]:
|
有关从真实数据创建 Dask 数组和 DataFrame 的更多信息,请参阅关于Dask 数组和Dask DataFrame的文档。
划分训练和测试数据¶
我们将数据集划分为训练数据和测试数据,以确保进行公平的测试,从而辅助评估。
[3]:
from dask_ml.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y)
X_train
[3]:
|
将数据持久化到内存¶
该数据集足够小,可以放入分布式内存中,因此我们调用 dask.persist
要求 Dask 执行上述计算并将结果保留在内存中。
[4]:
X_train, X_test, y_train, y_test = dask.persist(X_train, X_test, y_train, y_test)
如果您处理的数据集无法完全放入内存中,则应跳过此步骤。一切仍将正常工作,但速度会更慢,并且使用较少的内存。
调用 dask.persist
将把数据保存在内存中,这样当我们多次遍历数据时,就不需要进行计算。例如,如果我们的数据来自 CSV 文件且未持久化,则每次遍历都需要重新读取 CSV 文件。如果数据无法放入 RAM 中,这样做是可取的,否则会降低计算速度。
预计算类别¶
我们从训练数据中预计算出类别,这是此分类示例所必需的。
[5]:
classes = da.unique(y_train).compute()
classes
[5]:
array([0, 1])
创建 Scikit-Learn 模型¶
我们创建底层的 Scikit-Learn 估计器,一个 SGDClassifier
[6]:
from sklearn.linear_model import SGDClassifier
est = SGDClassifier(loss='log', penalty='l2', tol=1e-3)
这里我们使用了 SGDClassifier
,但任何实现了 partial_fit
方法的估计器都可以工作。实现此 API 的 Scikit-Learn 模型列表可在此处找到:here。
使用 Dask-ML 的 Incremental 元估计器包装¶
现在,我们使用 `dask_ml.wrappers.Incremental
<https://ml.dask.org.cn/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental>`__ 元估计器来包装我们的 SGDClassifier
。
[7]:
from dask_ml.wrappers import Incremental
inc = Incremental(est, scoring='accuracy')
回忆一下,Incremental
只负责数据管理,而将实际算法留给底层的 Scikit-Learn 估计器。
注意:我们在上面的 Dask 估计器中设置了 scoring 参数,以告知它处理评分。在使用 Dask 数组作为测试数据时,这会更好地工作。
模型训练¶
Incremental
实现了一个 fit
方法,它将对数据集进行一次遍历,并对 Dask 数组中的每个块调用 partial_fit
。
在此拟合过程中,您可以查看仪表盘以观察多个批次的顺序拟合过程。
[8]:
inc.fit(X_train, y_train, classes=classes)
[8]:
Incremental(estimator=SGDClassifier(loss='log'), scoring='accuracy')
[9]:
inc.score(X_test, y_test)
[9]:
0.5942
多次遍历训练数据¶
调用 .fit
会遍历数据的每个块一次。然而,在许多情况下,我们可能希望多次遍历训练数据。为此,我们可以使用 Incremental.partial_fit
方法和 for 循环。
[10]:
est = SGDClassifier(loss='log', penalty='l2', tol=0e-3)
inc = Incremental(est, scoring='accuracy')
[11]:
for _ in range(10):
inc.partial_fit(X_train, y_train, classes=classes)
print('Score:', inc.score(X_test, y_test))
Score: 0.6102
Score: 0.5896
Score: 0.5897
Score: 0.6159
Score: 0.6154
Score: 0.62
Score: 0.6254
Score: 0.6394
Score: 0.6345
Score: 0.637
预测和评分¶
最后,我们还可以在测试数据上调用 Incremental.predict
和 Incremental.score
[12]:
inc.predict(X_test) # Predict produces lazy dask arrays
[12]:
|
[13]:
inc.predict(X_test)[:100].compute() # call compute to get results
[13]:
array([1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1,
0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0,
0, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0,
1, 1, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1,
0, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 0])
[14]:
inc.score(X_test, y_test)
[14]:
0.637
了解更多¶
在本 Notebook 中,我们介绍了如何使用 Dask-ML 的 Incremental
元估计器来自动化实现 partial_fit
方法的 Scikit-Learn 估计器的增量训练过程。如果您想了解更多关于此过程的信息,可以查阅以下文档