DataFrames: Groupby
目录
实时 Notebook
您可以在 实时会话 中运行此 notebook 或在 Github 上查看。
DataFrames: Groupby¶
本 notebook 使用可扩展的 Dask dataframes 上的 Pandas groupby-aggregate 和 groupby-apply 操作。它将讨论常见用法和最佳实践。
启动 Dask 客户端以获取仪表板¶
启动 Dask 客户端是可选的。它将提供一个仪表板,有助于深入了解计算过程。
在下方创建客户端后,仪表板链接将变得可见。我们建议将其在屏幕一侧打开,而您的 notebook 在另一侧。这可能需要一些精力来安排窗口,但在学习时同时查看两者非常有用。
[1]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client
[1]:
客户端
Client-f66fcc00-0de0-11ed-a18c-000d3a8f7959
连接方法: 集群对象 | 集群类型: distributed.LocalCluster |
仪表板: http://127.0.0.1:8787/status |
集群信息
LocalCluster
40f50cf8
仪表板: http://127.0.0.1:8787/status | 工作节点 1 |
总线程数 4 | 总内存: 1.86 GiB |
状态: 运行中 | 使用进程: True |
调度器信息
调度器
Scheduler-c95eb614-e548-45f8-bbbb-9867335584b4
通信: tcp://127.0.0.1:37903 | 工作节点 1 |
仪表板: http://127.0.0.1:8787/status | 总线程数 4 |
启动时间: 刚刚 | 总内存: 1.86 GiB |
工作节点
工作节点:0
通信: tcp://127.0.0.1:34411 | 总线程数 4 |
仪表板: http://127.0.0.1:44441/status | 内存: 1.86 GiB |
Nanny: tcp://127.0.0.1:36749 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-8cgbsa8t |
人工数据集¶
我们创建一个人工时间序列数据集来帮助我们处理 groupby 操作
[2]:
import dask
df = dask.datasets.timeseries()
df
[2]:
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | int64 | object | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
此数据集足够小,可以放入集群内存中,因此我们现在将其持久化。
如果您的数据集太大而无法放入内存,则可以跳过此步骤。
[3]:
df = df.persist()
Groupby 聚合¶
Dask dataframes 实现了 Pandas groupby API 的常用子集(参见 Pandas Groupby 文档)。
我们从 groupby 聚合开始。假设组的数量较少(少于一百万),这些聚合通常非常高效。
[4]:
df.groupby('name').x.mean().compute()
[4]:
name
Alice -0.000260
Bob -0.000536
Charlie 0.001401
Dan 0.000685
Edith -0.000263
Frank -0.000048
George -0.001241
Hannah -0.002285
Ingrid -0.002062
Jerry 0.000779
Kevin 0.002219
Laura -0.000472
Michael -0.003100
Norbert -0.001520
Oliver 0.001986
Patricia 0.000934
Quinn -0.000233
Ray -0.001600
Sarah -0.000426
Tim 0.001102
Ursula 0.002923
Victor -0.000821
Wendy -0.001977
Xavier -0.002330
Yvonne -0.001805
Zelda -0.004697
Name: x, dtype: float64
性能取决于您进行的聚合(均值 vs 标准差)、您进行分组的键(name vs id)以及总的组数。
[5]:
%time _ = df.groupby('id').x.mean().compute()
CPU times: user 63.9 ms, sys: 3.83 ms, total: 67.7 ms
Wall time: 235 ms
[6]:
%time _ = df.groupby('name').x.mean().compute()
CPU times: user 87.4 ms, sys: 0 ns, total: 87.4 ms
Wall time: 452 ms
[7]:
%time df.groupby('name').agg({'x': ['mean', 'std'], 'y': ['mean', 'count']}).compute().head()
CPU times: user 66.1 ms, sys: 4.25 ms, total: 70.4 ms
Wall time: 355 ms
[7]:
x | y | |||
---|---|---|---|---|
mean | std | mean | count | |
name | ||||
Alice | -0.000260 | 0.577759 | 0.001977 | 99471 |
Bob | -0.000536 | 0.577571 | -0.000872 | 99283 |
Charlie | 0.001401 | 0.577126 | -0.001523 | 99326 |
Dan | 0.000685 | 0.577608 | -0.002160 | 99687 |
Edith | -0.000263 | 0.577649 | 0.001361 | 100010 |
这与 Pandas 中的情况相同。一般来说,Dask.dataframe 的 groupby 聚合与 Pandas 的 groupby 聚合性能大致相同,只是更具可扩展性。
您可以在 Pandas 文档中了解更多关于 Pandas 常见聚合的信息。
自定义聚合¶
Dask dataframe Aggregate 可用于自定义聚合(参见 Dask dataframe Aggregate 文档)。
多个组¶
默认情况下,groupby 聚合(如 groupby-mean 或 groupby-sum)将结果作为单分区 Dask dataframe 返回。其结果通常相当小,因此这通常是一个不错的选择。
然而,有时人们希望对多个组(数百万或更多)进行 groupby 聚合。在这种情况下,完整的结果可能无法放入单个 Pandas dataframe 输出中,您可能需要将输出拆分为多个分区。您可以使用 split_out=
参数来控制此行为。
[8]:
# Computational graph of a single output aggregation (for a small number of groups, like 1000)
df.groupby('name').x.mean().visualize(node_attr={'penwidth': '6'})
[8]:

[9]:
# Computational graph of an aggregation to four outputs (for a larger number of groups, like 1000000)
df.groupby('id').x.mean(split_out=4).visualize(node_attr={'penwidth': '6'})
[9]:

Groupby Apply¶
Groupby 聚合通常非常快速,因为它们可以轻松分解为众所周知的操作。数据不需要过多地移动,我们只需在网络上传递小的中间值。
然而,对于某些操作,要应用的函数需要给定组的全部数据(例如名为“Alice”的每个记录)。这将导致大量通信,成本更高,但 Groupby-apply 方法仍然可以实现。如果 groupby 聚合可行,则应避免使用此方法。
在以下示例中,我们在每个人的姓名上训练一个简单的 Scikit-Learn 机器学习模型。
[10]:
from sklearn.linear_model import LinearRegression
def train(partition):
if partition.empty:
return
est = LinearRegression()
est.fit(partition[['x', 'id']].values, partition.y.values)
return est
[11]:
%time df.groupby('name').apply(train, meta=object).compute().sort_index()
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
warnings.warn(
CPU times: user 676 ms, sys: 46.7 ms, total: 723 ms
Wall time: 4.25 s
[11]:
name
Alice LinearRegression()
Bob LinearRegression()
Charlie LinearRegression()
Dan LinearRegression()
Edith LinearRegression()
Frank LinearRegression()
George LinearRegression()
Hannah LinearRegression()
Ingrid LinearRegression()
Jerry LinearRegression()
Kevin LinearRegression()
Laura LinearRegression()
Michael LinearRegression()
Norbert LinearRegression()
Oliver LinearRegression()
Patricia LinearRegression()
Quinn LinearRegression()
Ray LinearRegression()
Sarah LinearRegression()
Tim LinearRegression()
Ursula LinearRegression()
Victor LinearRegression()
Wendy LinearRegression()
Xavier LinearRegression()
Yvonne LinearRegression()
Zelda LinearRegression()
dtype: object
[ ]: