为大型数据集评分和预测

实时笔记本

您可以在实时会话中运行此笔记本 Binder在 Github 上查看

为大型数据集评分和预测

有时您会在适合内存的较小数据集上进行训练,但需要对大得多的(可能大于内存的)数据集进行预测或评分。也许您的学习曲线已经趋于平缓,或者您只有部分数据的标签。

在这种情况下,您可以使用ParallelPostFit来并行化和分布式处理评分或预测步骤。

[1]:
from dask.distributed import Client, progress

# Scale up: connect to your own cluster with bmore resources
# see https://dask.org.cn/en/latest/setup.html
client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='2GB')
client
[1]:

客户端

Client-7847b57a-0de1-11ed-a43b-000d3a8f7959

连接方法: 集群对象 集群类型: distributed.LocalCluster
仪表盘: http://10.1.1.64:8787/status

集群信息

[2]:
import numpy as np
import dask.array as da
from sklearn.datasets import make_classification

我们将使用 scikit-learn 生成一个小型的随机数据集。

[3]:
X_train, y_train = make_classification(
    n_features=2, n_redundant=0, n_informative=2,
    random_state=1, n_clusters_per_class=1, n_samples=1000)
X_train[:5]
[3]:
array([[ 1.53682958, -1.39869399],
       [ 1.36917601, -0.63734411],
       [ 0.50231787, -0.45910529],
       [ 1.83319262, -1.29808229],
       [ 1.04235568,  1.12152929]])

我们将使用 dask.array 多次克隆该数据集。X_largey_large 代表我们大于内存的数据集。

[4]:
# Scale up: increase N, the number of times we replicate the data.
N = 100
X_large = da.concatenate([da.from_array(X_train, chunks=X_train.shape)
                          for _ in range(N)])
y_large = da.concatenate([da.from_array(y_train, chunks=y_train.shape)
                          for _ in range(N)])
X_large
[4]:
Array Chunk
Bytes 1.53 MiB 15.62 kiB
Shape (100000, 2) (1000, 2)
Count 101 Tasks 100 Chunks
Type float64 numpy.ndarray
2 100000

由于我们的训练数据集适合内存,我们可以使用 scikit-learn 估计器作为训练期间实际拟合的估计器。但我们知道需要对大型数据集进行预测,因此我们将使用 ParallelPostFit 包装 scikit-learn 估计器。

[5]:
from sklearn.linear_model import LogisticRegressionCV
from dask_ml.wrappers import ParallelPostFit
[6]:
clf = ParallelPostFit(LogisticRegressionCV(cv=3), scoring="r2")

关于何时以及为何需要 scoring 参数,请参阅 dask-ml 文档中的说明:https://ml.dask.org.cn/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit

现在我们将调用 clf.fit。Dask-ML 在这里不做任何事情,因此此步骤只能使用适合内存的数据集。

[7]:
clf.fit(X_train, y_train)
[7]:
ParallelPostFit(estimator=LogisticRegressionCV(cv=3), scoring='r2')

训练完成后,我们将转向对完整(大于内存的)数据集进行预测。

[8]:
y_pred = clf.predict(X_large)
y_pred
[8]:
Array Chunk
Bytes 781.25 kiB 7.81 kiB
Shape (100000,) (1000,)
Count 201 Tasks 100 Chunks
Type int64 numpy.ndarray
100000 1

y_pred 是 Dask array。工作节点可以将预测值写入共享文件系统,而无需在单个机器上收集数据。

或者我们可以在整个大型数据集上检查模型的得分。计算将并行进行,无需单个机器来保存所有数据。

[9]:
clf.score(X_large, y_large)
[9]:
0.596