实时 Notebook

您可以在 实时会话 中运行此 notebook Binder 或在 Github 上查看

DataFrames: Groupby

本 notebook 使用可扩展的 Dask dataframes 上的 Pandas groupby-aggregate 和 groupby-apply 操作。它将讨论常见用法和最佳实践。

启动 Dask 客户端以获取仪表板

启动 Dask 客户端是可选的。它将提供一个仪表板,有助于深入了解计算过程。

在下方创建客户端后,仪表板链接将变得可见。我们建议将其在屏幕一侧打开,而您的 notebook 在另一侧。这可能需要一些精力来安排窗口,但在学习时同时查看两者非常有用。

[1]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client
[1]:

客户端

Client-f66fcc00-0de0-11ed-a18c-000d3a8f7959

连接方法: 集群对象 集群类型: distributed.LocalCluster
仪表板: http://127.0.0.1:8787/status

集群信息

人工数据集

我们创建一个人工时间序列数据集来帮助我们处理 groupby 操作

[2]:
import dask
df = dask.datasets.timeseries()
df
[2]:
Dask DataFrame 结构
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: make-timeseries, 30 个任务

此数据集足够小,可以放入集群内存中,因此我们现在将其持久化。

如果您的数据集太大而无法放入内存,则可以跳过此步骤。

[3]:
df = df.persist()

Groupby 聚合

Dask dataframes 实现了 Pandas groupby API 的常用子集(参见 Pandas Groupby 文档)。

我们从 groupby 聚合开始。假设组的数量较少(少于一百万),这些聚合通常非常高效。

[4]:
df.groupby('name').x.mean().compute()
[4]:
name
Alice      -0.000260
Bob        -0.000536
Charlie     0.001401
Dan         0.000685
Edith      -0.000263
Frank      -0.000048
George     -0.001241
Hannah     -0.002285
Ingrid     -0.002062
Jerry       0.000779
Kevin       0.002219
Laura      -0.000472
Michael    -0.003100
Norbert    -0.001520
Oliver      0.001986
Patricia    0.000934
Quinn      -0.000233
Ray        -0.001600
Sarah      -0.000426
Tim         0.001102
Ursula      0.002923
Victor     -0.000821
Wendy      -0.001977
Xavier     -0.002330
Yvonne     -0.001805
Zelda      -0.004697
Name: x, dtype: float64

性能取决于您进行的聚合(均值 vs 标准差)、您进行分组的键(name vs id)以及总的组数。

[5]:
%time _ = df.groupby('id').x.mean().compute()
CPU times: user 63.9 ms, sys: 3.83 ms, total: 67.7 ms
Wall time: 235 ms
[6]:
%time _ = df.groupby('name').x.mean().compute()
CPU times: user 87.4 ms, sys: 0 ns, total: 87.4 ms
Wall time: 452 ms
[7]:
%time df.groupby('name').agg({'x': ['mean', 'std'], 'y': ['mean', 'count']}).compute().head()
CPU times: user 66.1 ms, sys: 4.25 ms, total: 70.4 ms
Wall time: 355 ms
[7]:
x y
mean std mean count
name
Alice -0.000260 0.577759 0.001977 99471
Bob -0.000536 0.577571 -0.000872 99283
Charlie 0.001401 0.577126 -0.001523 99326
Dan 0.000685 0.577608 -0.002160 99687
Edith -0.000263 0.577649 0.001361 100010

这与 Pandas 中的情况相同。一般来说,Dask.dataframe 的 groupby 聚合与 Pandas 的 groupby 聚合性能大致相同,只是更具可扩展性。

您可以在 Pandas 文档中了解更多关于 Pandas 常见聚合的信息。

自定义聚合

Dask dataframe Aggregate 可用于自定义聚合(参见 Dask dataframe Aggregate 文档)。

多个组

默认情况下,groupby 聚合(如 groupby-mean 或 groupby-sum)将结果作为单分区 Dask dataframe 返回。其结果通常相当小,因此这通常是一个不错的选择。

然而,有时人们希望对多个组(数百万或更多)进行 groupby 聚合。在这种情况下,完整的结果可能无法放入单个 Pandas dataframe 输出中,您可能需要将输出拆分为多个分区。您可以使用 split_out= 参数来控制此行为。

[8]:
# Computational graph of a single output aggregation (for a small number of groups, like 1000)
df.groupby('name').x.mean().visualize(node_attr={'penwidth': '6'})
[8]:
../_images/dataframes_02-groupby_16_0.png
[9]:
# Computational graph of an aggregation to four outputs (for a larger number of groups, like 1000000)
df.groupby('id').x.mean(split_out=4).visualize(node_attr={'penwidth': '6'})
[9]:
../_images/dataframes_02-groupby_17_0.png

Groupby Apply

Groupby 聚合通常非常快速,因为它们可以轻松分解为众所周知的操作。数据不需要过多地移动,我们只需在网络上传递小的中间值。

然而,对于某些操作,要应用的函数需要给定组的全部数据(例如名为“Alice”的每个记录)。这将导致大量通信,成本更高,但 Groupby-apply 方法仍然可以实现。如果 groupby 聚合可行,则应避免使用此方法。

在以下示例中,我们在每个人的姓名上训练一个简单的 Scikit-Learn 机器学习模型。

[10]:
from sklearn.linear_model import LinearRegression

def train(partition):
    if partition.empty:
        return
    est = LinearRegression()
    est.fit(partition[['x', 'id']].values, partition.y.values)
    return est
[11]:
%time df.groupby('name').apply(train, meta=object).compute().sort_index()
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
  warnings.warn(
CPU times: user 676 ms, sys: 46.7 ms, total: 723 ms
Wall time: 4.25 s
[11]:
name
Alice       LinearRegression()
Bob         LinearRegression()
Charlie     LinearRegression()
Dan         LinearRegression()
Edith       LinearRegression()
Frank       LinearRegression()
George      LinearRegression()
Hannah      LinearRegression()
Ingrid      LinearRegression()
Jerry       LinearRegression()
Kevin       LinearRegression()
Laura       LinearRegression()
Michael     LinearRegression()
Norbert     LinearRegression()
Oliver      LinearRegression()
Patricia    LinearRegression()
Quinn       LinearRegression()
Ray         LinearRegression()
Sarah       LinearRegression()
Tim         LinearRegression()
Ursula      LinearRegression()
Victor      LinearRegression()
Wendy       LinearRegression()
Xavier      LinearRegression()
Yvonne      LinearRegression()
Zelda       LinearRegression()
dtype: object
[ ]: