实时 Notebook

你可以在实时会话中运行此 Notebook Binder 或在 Github 上查看

逐步训练大型数据集

我们可以一次训练一个批次的大型数据集上的模型。许多 Scikit-Learn 估计器实现了 partial_fit 方法,以支持批量增量学习。

est = SGDClassifier(...)
est.partial_fit(X_train_1, y_train_1)
est.partial_fit(X_train_2, y_train_2)
...

Scikit-Learn 文档在其用户指南中更深入地讨论了这种方法。

本 Notebook 演示了 Dask-ML 的 Incremental 元估计器的用法,它自动化了在 Dask 数组和 DataFrame 上使用 Scikit-Learn 的 partial_fit 的过程。Scikit-Learn 处理所有计算,而 Dask 根据需要处理数据管理、加载和移动数据批次。这使得可以在许多机器上分布式处理大型数据集,或处理无法完全放入内存的数据集,所有这些都通过熟悉的工作流程进行。

此示例展示了...

  • 使用 Dask-ML Incremental 元估计器包装实现 partial_fit 的 Scikit-Learn 估计器

  • 在此包装的估计器上进行训练、预测和评分

尽管此示例使用了 Scikit-Learn 的 SGDClassifier,但 Incremental 元估计器适用于任何实现 partial_fit 方法和 scikit-learn 基估计器 API 的类。

c4999601f3f1404e9d9b52727a0d92a1 e3baadfd3b404eabbfbd75b346642908

设置 Dask

我们首先启动一个 Dask 客户端,以访问 Dask 仪表盘,它将提供进度和性能指标。

运行单元格后,您可以点击仪表盘链接查看仪表盘

[1]:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1)
client
[1]:

客户端

Client-6fe04c8e-0de1-11ed-a3a3-000d3a8f7959

连接方法: Cluster 对象 集群类型: distributed.LocalCluster
仪表盘: http://127.0.0.1:8787/status

集群信息

创建数据

我们创建一个合成数据集,它足够大以引起兴趣,但又足够小以便快速运行。

我们的数据集有 1,000,000 个样本和 100 个特征。

[2]:
import dask
import dask.array as da
from dask_ml.datasets import make_classification


n, d = 100000, 100

X, y = make_classification(n_samples=n, n_features=d,
                           chunks=n // 10, flip_y=0.2)
X
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:1283: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
  warnings.warn(
[2]:
数组
字节 76.29 MiB 7.63 MiB
形状 (100000, 100) (10000, 100)
计数 10 个任务 10 个块
类型 float64 numpy.ndarray
100 100000

有关从真实数据创建 Dask 数组和 DataFrame 的更多信息,请参阅关于Dask 数组Dask DataFrame的文档。

划分训练和测试数据

我们将数据集划分为训练数据和测试数据,以确保进行公平的测试,从而辅助评估。

[3]:
from dask_ml.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y)
X_train
[3]:
数组
字节 68.66 MiB 6.87 MiB
形状 (90000, 100) (9000, 100)
计数 70 个任务 10 个块
类型 float64 numpy.ndarray
100 90000

将数据持久化到内存

该数据集足够小,可以放入分布式内存中,因此我们调用 dask.persist 要求 Dask 执行上述计算并将结果保留在内存中。

[4]:
X_train, X_test, y_train, y_test = dask.persist(X_train, X_test, y_train, y_test)

如果您处理的数据集无法完全放入内存中,则应跳过此步骤。一切仍将正常工作,但速度会更慢,并且使用较少的内存。

调用 dask.persist 将把数据保存在内存中,这样当我们多次遍历数据时,就不需要进行计算。例如,如果我们的数据来自 CSV 文件且未持久化,则每次遍历都需要重新读取 CSV 文件。如果数据无法放入 RAM 中,这样做是可取的,否则会降低计算速度。

预计算类别

我们从训练数据中预计算出类别,这是此分类示例所必需的。

[5]:
classes = da.unique(y_train).compute()
classes
[5]:
array([0, 1])

创建 Scikit-Learn 模型

我们创建底层的 Scikit-Learn 估计器,一个 SGDClassifier

[6]:
from sklearn.linear_model import SGDClassifier

est = SGDClassifier(loss='log', penalty='l2', tol=1e-3)

这里我们使用了 SGDClassifier,但任何实现了 partial_fit 方法的估计器都可以工作。实现此 API 的 Scikit-Learn 模型列表可在此处找到:here

使用 Dask-ML 的 Incremental 元估计器包装

现在,我们使用 `dask_ml.wrappers.Incremental <https://ml.dask.org.cn/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental>`__ 元估计器来包装我们的 SGDClassifier

[7]:
from dask_ml.wrappers import Incremental

inc = Incremental(est, scoring='accuracy')

回忆一下,Incremental 只负责数据管理,而将实际算法留给底层的 Scikit-Learn 估计器。

注意:我们在上面的 Dask 估计器中设置了 scoring 参数,以告知它处理评分。在使用 Dask 数组作为测试数据时,这会更好地工作。

模型训练

Incremental 实现了一个 fit 方法,它将对数据集进行一次遍历,并对 Dask 数组中的每个块调用 partial_fit

在此拟合过程中,您可以查看仪表盘以观察多个批次的顺序拟合过程。

[8]:
inc.fit(X_train, y_train, classes=classes)
[8]:
Incremental(estimator=SGDClassifier(loss='log'), scoring='accuracy')
[9]:
inc.score(X_test, y_test)
[9]:
0.5942

多次遍历训练数据

调用 .fit 会遍历数据的每个块一次。然而,在许多情况下,我们可能希望多次遍历训练数据。为此,我们可以使用 Incremental.partial_fit 方法和 for 循环。

[10]:
est = SGDClassifier(loss='log', penalty='l2', tol=0e-3)
inc = Incremental(est, scoring='accuracy')
[11]:
for _ in range(10):
    inc.partial_fit(X_train, y_train, classes=classes)
    print('Score:', inc.score(X_test, y_test))
Score: 0.6102
Score: 0.5896
Score: 0.5897
Score: 0.6159
Score: 0.6154
Score: 0.62
Score: 0.6254
Score: 0.6394
Score: 0.6345
Score: 0.637

预测和评分

最后,我们还可以在测试数据上调用 Incremental.predictIncremental.score

[12]:
inc.predict(X_test)  # Predict produces lazy dask arrays
[12]:
数组
字节 78.12 kiB 7.81 kiB
形状 (10000,) (1000,)
计数 20 个任务 10 个块
类型 int64 numpy.ndarray
10000 1
[13]:
inc.predict(X_test)[:100].compute()  # call compute to get results
[13]:
array([1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1,
       0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0,
       0, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0,
       1, 1, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1,
       0, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 0])
[14]:
inc.score(X_test, y_test)
[14]:
0.637

了解更多

在本 Notebook 中,我们介绍了如何使用 Dask-ML 的 Incremental 元估计器来自动化实现 partial_fit 方法的 Scikit-Learn 估计器的增量训练过程。如果您想了解更多关于此过程的信息,可以查阅以下文档

  1. https://scikit-learn.cn/stable/computing/scaling_strategies.html

  2. Dask-ML Incremental API 文档

  3. 与 Dask-ML Incremental 兼容的 Scikit-Learn 估计器列表

  4. 有关用于模型评估的训练-测试划分的更多信息,请参阅超参数和模型验证