Dask DataFrames
目录
Dask DataFrames¶
Dask Dataframes 协调许多 Pandas dataframes,沿索引进行分区。它们支持 Pandas API 的大部分子集。
启动 Dask Client 以访问仪表板¶
启动 Dask Client 是可选的。它将提供一个仪表板,有助于深入了解计算过程。
创建客户端后,仪表板的链接将可见。我们建议在使用 notebook 时,将仪表板打开在屏幕的一侧。这可能需要一些努力来调整窗口布局,但在学习时同时查看两者会非常有用。
[1]:
from dask.distributed import Client
client = Client(n_workers=2, threads_per_worker=2, memory_limit="1GB")
client
[1]:
客户端
Client-e0ebc4ca-0ddf-11ed-98b5-000d3a8f7959
连接方法: Cluster object | 集群类型: distributed.LocalCluster |
仪表板: http://127.0.0.1:8787/status |
集群信息
LocalCluster
fb3316a9
仪表板: http://127.0.0.1:8787/status | 工作节点 2 |
总线程数 4 | 总内存: 1.86 GiB |
状态: 运行中 | 使用进程: True |
调度器信息
调度器
Scheduler-44cb79c2-de9d-4ec5-8466-954570037d71
通信: tcp://127.0.0.1:40039 | 工作节点 2 |
仪表板: http://127.0.0.1:8787/status | 总线程数 4 |
启动时间: 刚刚 | 总内存: 1.86 GiB |
工作节点
工作节点: 0
通信: tcp://127.0.0.1:38545 | 总线程数 2 |
仪表板: http://127.0.0.1:38797/status | 内存: 0.93 GiB |
Nanny: tcp://127.0.0.1:41967 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-7ivifrfo |
工作节点: 1
通信: tcp://127.0.0.1:36071 | 总线程数 2 |
仪表板: http://127.0.0.1:46269/status | 内存: 0.93 GiB |
Nanny: tcp://127.0.0.1:37775 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-7ftt40fj |
创建随机 Dataframe¶
我们创建一个具有以下属性的随机时间序列数据
它存储了 2000 年 1 月份每一秒的记录
它按天分割该月份,将每一天作为一个分区 dataframe
除了 datetime 索引外,它还包含 names, ids 和 numeric values 列
这是一个约 240 MB 的小数据集。通过设置 `dask.datasets.timeseries()
函数的一些参数 <https://docs.dask.org.cn/en/stable/api.html#dask.datasets.timeseries>`__,可以增加天数或减小数据点之间的时间间隔,以便使用更大的数据集进行练习。
[2]:
import dask
df = dask.datasets.timeseries()
与 Pandas 不同,Dask DataFrames 是惰性的 (lazy),这意味着只有在计算需要时才会加载数据。这里没有打印数据,而是用省略号 (...
) 代替。
[3]:
df
[3]:
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | int64 | object | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
尽管如此,列名和 dtypes 是已知的。
[4]:
df.dtypes
[4]:
id int64
name object
x float64
y float64
dtype: object
一些操作会自动显示数据。
[5]:
# This sets some formatting parameters for displayed data.
import pandas as pd
pd.options.display.precision = 2
pd.options.display.max_rows = 10
[6]:
df.head(3)
[6]:
id | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 999 | Patricia | 0.86 | 0.50 |
2000-01-01 00:00:01 | 974 | Alice | -0.04 | 0.25 |
2000-01-01 00:00:02 | 984 | Ursula | -0.05 | -0.92 |
使用标准 Pandas 操作¶
大多数常用的 Pandas 操作可以在 Dask dataframes 上以相同的方式使用。本示例展示了如何根据掩码条件对数据进行切片,然后确定 x
列中数据的标准差。
[7]:
df2 = df[df.y > 0]
df3 = df2.groupby("name").x.std()
df3
[7]:
Dask Series Structure:
npartitions=1
float64
...
Name: x, dtype: float64
Dask Name: sqrt, 157 tasks
请注意,df3
中的数据仍然用省略号表示。上一个单元格中的所有操作都是惰性操作。当您希望将结果作为 Pandas dataframe 或 series 时,可以调用 .compute()
。
如果您在上面启动了 Client()
,那么您可以在计算期间查看状态页面以查看进度。
[8]:
computed_df = df3.compute()
type(computed_df)
[8]:
pandas.core.series.Series
[9]:
computed_df
[9]:
name
Alice 0.58
Bob 0.58
Charlie 0.58
Dan 0.58
Edith 0.58
...
Victor 0.58
Wendy 0.58
Xavier 0.58
Yvonne 0.58
Zelda 0.58
Name: x, Length: 26, dtype: float64
请注意,计算后的数据现在已显示在输出中。
另一个计算示例是聚合多个列,如下所示。同样,仪表板将显示计算的进度。
[10]:
df4 = df.groupby("name").aggregate({"x": "sum", "y": "max"})
df4.compute()
[10]:
x | y | |
---|---|---|
name | ||
Alice | 172.30 | 1.0 |
Bob | 54.79 | 1.0 |
Charlie | 255.01 | 1.0 |
Dan | 93.23 | 1.0 |
Edith | 155.41 | 1.0 |
... | ... | ... |
Victor | -303.80 | 1.0 |
Wendy | -20.97 | 1.0 |
Xavier | 112.18 | 1.0 |
Yvonne | 335.84 | 1.0 |
Zelda | 205.43 | 1.0 |
26 行 × 2 列
Dask dataframes 也可以像 Pandas dataframes 一样进行连接。在本例中,我们将 df4
中的聚合数据与 df
中的原始数据连接起来。由于 df
的索引是时间序列,而 df4
是按名称索引的,我们使用 left_on="name"
和 right_index=True
来定义合并列。我们还为两个 dataframe 之间共有的列设置了后缀,以便区分它们。
最后,由于 df4
很小,我们也确保它是一个单分区 dataframe。
[11]:
df4 = df4.repartition(npartitions=1)
joined = df.merge(
df4, left_on="name", right_index=True, suffixes=("_original", "_aggregated")
)
joined.head()
[11]:
id | name | x_original | y_original | x_aggregated | y_aggregated | |
---|---|---|---|---|---|---|
timestamp | ||||||
2000-01-01 00:00:00 | 999 | Patricia | 0.86 | 0.50 | -34.68 | 1.0 |
2000-01-01 00:00:03 | 988 | Patricia | -0.57 | -0.67 | -34.68 | 1.0 |
2000-01-01 00:00:12 | 1038 | Patricia | -0.48 | 0.35 | -34.68 | 1.0 |
2000-01-01 00:01:16 | 964 | Patricia | -0.25 | 0.13 | -34.68 | 1.0 |
2000-01-01 00:01:33 | 1050 | Patricia | -0.58 | -0.38 | -34.68 | 1.0 |
时间序列操作¶
因为 df
具有 datetime 索引,时间序列操作可以高效运行。
下面的第一个示例以 1 小时为间隔对数据进行重采样,以减小 dataframe 的总大小。然后计算 x
和 y
列的平均值。
[13]:
df[["x", "y"]].resample("1h").mean().head()
[13]:
x | y | |
---|---|---|
timestamp | ||
2000-01-01 00:00:00 | 1.96e-03 | 1.43e-02 |
2000-01-01 01:00:00 | 5.23e-03 | 1.51e-02 |
2000-01-01 02:00:00 | -9.91e-04 | -9.25e-04 |
2000-01-01 03:00:00 | -2.57e-03 | -5.00e-03 |
2000-01-01 04:00:00 | -5.71e-03 | 1.16e-02 |
下一个示例以 24 小时为间隔对数据进行重采样并绘制平均值。请注意,plot()
在 compute()
之后调用,因为 plot()
只有在数据计算完成后才能工作。
[14]:
%matplotlib inline
df[['x', 'y']].resample('24h').mean().compute().plot();

最后一个示例计算数据的滚动 24 小时平均值。
[15]:
df[["x", "y"]].rolling(window="24h").mean().head()
[15]:
x | y | |
---|---|---|
timestamp | ||
2000-01-01 00:00:00 | 0.86 | 0.50 |
2000-01-01 00:00:01 | 0.41 | 0.38 |
2000-01-01 00:00:02 | 0.25 | -0.05 |
2000-01-01 00:00:03 | 0.05 | -0.21 |
2000-01-01 00:00:04 | 0.18 | -0.18 |
沿着索引进行随机访问是廉价的,但由于 Dask dataframe 是惰性的,必须进行计算才能将数据实例化。
[16]:
df.loc["2000-01-05"]
[16]:
id | name | x | y | |
---|---|---|---|---|
npartitions=1 | ||||
2000-01-05 00:00:00.000000000 | int64 | object | float64 | float64 |
2000-01-05 23:59:59.999999999 | ... | ... | ... | ... |
[17]:
%time df.loc['2000-01-05'].compute()
CPU times: user 28.7 ms, sys: 7.62 ms, total: 36.3 ms
Wall time: 64.2 ms
[17]:
id | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-05 00:00:00 | 1001 | Hannah | 0.85 | -0.23 |
2000-01-05 00:00:01 | 1021 | Charlie | -0.09 | -0.42 |
2000-01-05 00:00:02 | 974 | Zelda | 0.70 | -0.81 |
2000-01-05 00:00:03 | 1015 | Sarah | 0.35 | -0.13 |
2000-01-05 00:00:04 | 989 | Frank | -0.26 | -0.96 |
... | ... | ... | ... | ... |
2000-01-05 23:59:55 | 1023 | Alice | 0.68 | 0.21 |
2000-01-05 23:59:56 | 982 | Alice | 0.90 | 0.74 |
2000-01-05 23:59:57 | 941 | Sarah | -0.49 | -0.39 |
2000-01-05 23:59:58 | 1009 | Kevin | 0.89 | -0.26 |
2000-01-05 23:59:59 | 1031 | Hannah | 0.80 | 0.53 |
86400 行 × 4 列
设置索引¶
数据按索引列排序。这可以加快访问、连接、groupby-apply 等操作。然而,并行排序数据可能会耗费成本,因此设置索引既重要,但不应频繁进行。在接下来的几个示例中,我们将按 name
列对数据进行分组,因此我们将该列设置为索引以提高效率。
[18]:
df5 = df.set_index("name")
df5
[18]:
id | x | y | |
---|---|---|---|
npartitions=26 | |||
Alice | int64 | float64 | float64 |
Bob | ... | ... | ... |
... | ... | ... | ... |
Zelda | ... | ... | ... |
Zelda | ... | ... | ... |
由于重置此数据集的索引成本较高,并且我们可以将其放入可用 RAM 中,因此我们将数据集持久化到内存。
[19]:
df5 = df5.persist()
df5
[19]:
id | x | y | |
---|---|---|---|
npartitions=26 | |||
Alice | int64 | float64 | float64 |
Bob | ... | ... | ... |
... | ... | ... | ... |
Zelda | ... | ... | ... |
Zelda | ... | ... | ... |
Dask 现在知道所有数据存储在哪里,并按名称索引。因此,随机访问等操作变得廉价且高效。
[20]:
%time df5.loc['Alice'].compute()
CPU times: user 360 ms, sys: 44.7 ms, total: 404 ms
Wall time: 2.35 s
[20]:
id | x | y | |
---|---|---|---|
name | |||
Alice | 974 | -0.04 | 0.25 |
Alice | 1001 | 0.44 | 0.55 |
Alice | 1039 | -0.38 | 0.88 |
Alice | 974 | -0.71 | 0.12 |
Alice | 960 | 0.98 | -0.21 |
... | ... | ... | ... |
Alice | 994 | 0.56 | 0.90 |
Alice | 999 | 0.03 | -0.11 |
Alice | 988 | -0.49 | -0.80 |
Alice | 999 | -0.35 | -0.20 |
Alice | 1079 | -0.29 | 0.88 |
99801 行 × 3 列
结合 Scikit-Learn 使用 Groupby Apply¶
现在我们的数据已按名称排序,我们可以廉价地进行按名称随机访问或使用自定义函数进行 groupby-apply 等操作。
这里我们在每个名称上训练一个不同的 scikit-learn 线性回归模型。
[21]:
from sklearn.linear_model import LinearRegression
def train(partition):
if not len(partition):
return
est = LinearRegression()
est.fit(partition[["x"]].values, partition.y.values)
return est
train()
函数的 partition
参数将是 DataFrameGroupBy
中的一个分组实例。如果分区中没有数据,则无需继续。如果存在数据,我们将拟合线性回归模型并将其作为此组的值返回。
现在处理 df5
,其索引是来自 df
的名称,我们可以按 names
列进行分组。这恰好也是索引,但这没关系。然后我们使用 .apply()
对 .groupby()
生成的 DataFrameGroupBy
中的每个组运行 train()
。
meta
参数告诉 Dask 如何创建用于保存 .apply()
结果的 DataFrame
或 Series
。在本例中,train()
返回一个单一值,因此 .apply()
将创建一个 Series
。这意味着我们需要告诉 Dask 该单一列的类型,并可以选择为其命名。
指定单列的最简单方法是使用元组。列名是元组的第一个元素。由于这是一系列线性回归结果,我们将列命名为 "LinearRegression"
。元组的第二个元素是 train
返回值的类型。在这种情况下,Pandas 将结果存储为一般 object
类型,这就是我们应该传递的类型。
[22]:
df6 = df5.groupby("name").apply(
train, meta=("LinearRegression", object)
).compute()
df6
[22]:
name
Alice LinearRegression()
Bob LinearRegression()
Charlie LinearRegression()
Dan LinearRegression()
Edith LinearRegression()
...
Victor LinearRegression()
Wendy LinearRegression()
Xavier LinearRegression()
Yvonne LinearRegression()
Zelda LinearRegression()
Name: LinearRegression, Length: 26, dtype: object