在线 Notebook

您可以在 在线会话 中运行此 notebook Binder,或在 Github 上查看。

Dask DataFrames

Dask dataframes are blocked Pandas dataframes

Dask Dataframes 协调许多 Pandas dataframes,沿索引进行分区。它们支持 Pandas API 的大部分子集。

启动 Dask Client 以访问仪表板

启动 Dask Client 是可选的。它将提供一个仪表板,有助于深入了解计算过程。

创建客户端后,仪表板的链接将可见。我们建议在使用 notebook 时,将仪表板打开在屏幕的一侧。这可能需要一些努力来调整窗口布局,但在学习时同时查看两者会非常有用。

[1]:
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=2, memory_limit="1GB")
client

[1]:

客户端

Client-e0ebc4ca-0ddf-11ed-98b5-000d3a8f7959

连接方法: Cluster object 集群类型: distributed.LocalCluster
仪表板: http://127.0.0.1:8787/status

集群信息

创建随机 Dataframe

我们创建一个具有以下属性的随机时间序列数据

  1. 它存储了 2000 年 1 月份每一秒的记录

  2. 它按天分割该月份,将每一天作为一个分区 dataframe

  3. 除了 datetime 索引外,它还包含 names, ids 和 numeric values 列

这是一个约 240 MB 的小数据集。通过设置 `dask.datasets.timeseries() 函数的一些参数 <https://docs.dask.org.cn/en/stable/api.html#dask.datasets.timeseries>`__,可以增加天数或减小数据点之间的时间间隔,以便使用更大的数据集进行练习。

[2]:
import dask

df = dask.datasets.timeseries()

与 Pandas 不同,Dask DataFrames 是惰性的 (lazy),这意味着只有在计算需要时才会加载数据。这里没有打印数据,而是用省略号 (...) 代替。

[3]:
df

[3]:
Dask DataFrame 结构
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask 名称:make-timeseries, 30 tasks

尽管如此,列名和 dtypes 是已知的。

[4]:
df.dtypes

[4]:
id        int64
name     object
x       float64
y       float64
dtype: object

一些操作会自动显示数据。

[5]:
# This sets some formatting parameters for displayed data.
import pandas as pd

pd.options.display.precision = 2
pd.options.display.max_rows = 10

[6]:
df.head(3)

[6]:
id name x y
timestamp
2000-01-01 00:00:00 999 Patricia 0.86 0.50
2000-01-01 00:00:01 974 Alice -0.04 0.25
2000-01-01 00:00:02 984 Ursula -0.05 -0.92

使用标准 Pandas 操作

大多数常用的 Pandas 操作可以在 Dask dataframes 上以相同的方式使用。本示例展示了如何根据掩码条件对数据进行切片,然后确定 x 列中数据的标准差。

[7]:
df2 = df[df.y > 0]
df3 = df2.groupby("name").x.std()
df3

[7]:
Dask Series Structure:
npartitions=1
    float64
        ...
Name: x, dtype: float64
Dask Name: sqrt, 157 tasks

请注意,df3 中的数据仍然用省略号表示。上一个单元格中的所有操作都是惰性操作。当您希望将结果作为 Pandas dataframe 或 series 时,可以调用 .compute()

如果您在上面启动了 Client(),那么您可以在计算期间查看状态页面以查看进度。

[8]:
computed_df = df3.compute()
type(computed_df)

[8]:
pandas.core.series.Series
[9]:
computed_df

[9]:
name
Alice      0.58
Bob        0.58
Charlie    0.58
Dan        0.58
Edith      0.58
           ...
Victor     0.58
Wendy      0.58
Xavier     0.58
Yvonne     0.58
Zelda      0.58
Name: x, Length: 26, dtype: float64

请注意,计算后的数据现在已显示在输出中。

另一个计算示例是聚合多个列,如下所示。同样,仪表板将显示计算的进度。

[10]:
df4 = df.groupby("name").aggregate({"x": "sum", "y": "max"})
df4.compute()

[10]:
x y
name
Alice 172.30 1.0
Bob 54.79 1.0
Charlie 255.01 1.0
Dan 93.23 1.0
Edith 155.41 1.0
... ... ...
Victor -303.80 1.0
Wendy -20.97 1.0
Xavier 112.18 1.0
Yvonne 335.84 1.0
Zelda 205.43 1.0

26 行 × 2 列

Dask dataframes 也可以像 Pandas dataframes 一样进行连接。在本例中,我们将 df4 中的聚合数据与 df 中的原始数据连接起来。由于 df 的索引是时间序列,而 df4 是按名称索引的,我们使用 left_on="name"right_index=True 来定义合并列。我们还为两个 dataframe 之间共有的列设置了后缀,以便区分它们。

最后,由于 df4 很小,我们也确保它是一个单分区 dataframe。

[11]:
df4 = df4.repartition(npartitions=1)
joined = df.merge(
    df4, left_on="name", right_index=True, suffixes=("_original", "_aggregated")
)
joined.head()

[11]:
id name x_original y_original x_aggregated y_aggregated
timestamp
2000-01-01 00:00:00 999 Patricia 0.86 0.50 -34.68 1.0
2000-01-01 00:00:03 988 Patricia -0.57 -0.67 -34.68 1.0
2000-01-01 00:00:12 1038 Patricia -0.48 0.35 -34.68 1.0
2000-01-01 00:01:16 964 Patricia -0.25 0.13 -34.68 1.0
2000-01-01 00:01:33 1050 Patricia -0.58 -0.38 -34.68 1.0

将数据持久化到内存

如果您的数据集有足够的可用 RAM,您可以将数据持久化到内存中。

这使得未来的计算速度更快。

[12]:
df = df.persist()

时间序列操作

因为 df 具有 datetime 索引,时间序列操作可以高效运行。

下面的第一个示例以 1 小时为间隔对数据进行重采样,以减小 dataframe 的总大小。然后计算 xy 列的平均值。

[13]:
df[["x", "y"]].resample("1h").mean().head()

[13]:
x y
timestamp
2000-01-01 00:00:00 1.96e-03 1.43e-02
2000-01-01 01:00:00 5.23e-03 1.51e-02
2000-01-01 02:00:00 -9.91e-04 -9.25e-04
2000-01-01 03:00:00 -2.57e-03 -5.00e-03
2000-01-01 04:00:00 -5.71e-03 1.16e-02

下一个示例以 24 小时为间隔对数据进行重采样并绘制平均值。请注意,plot()compute() 之后调用,因为 plot() 只有在数据计算完成后才能工作。

[14]:
%matplotlib inline
df[['x', 'y']].resample('24h').mean().compute().plot();
_images/dataframe_26_0.png

最后一个示例计算数据的滚动 24 小时平均值。

[15]:
df[["x", "y"]].rolling(window="24h").mean().head()

[15]:
x y
timestamp
2000-01-01 00:00:00 0.86 0.50
2000-01-01 00:00:01 0.41 0.38
2000-01-01 00:00:02 0.25 -0.05
2000-01-01 00:00:03 0.05 -0.21
2000-01-01 00:00:04 0.18 -0.18

沿着索引进行随机访问是廉价的,但由于 Dask dataframe 是惰性的,必须进行计算才能将数据实例化。

[16]:
df.loc["2000-01-05"]

[16]:
Dask DataFrame 结构
id name x y
npartitions=1
2000-01-05 00:00:00.000000000 int64 object float64 float64
2000-01-05 23:59:59.999999999 ... ... ... ...
Dask 名称:loc, 31 tasks
[17]:
%time df.loc['2000-01-05'].compute()
CPU times: user 28.7 ms, sys: 7.62 ms, total: 36.3 ms
Wall time: 64.2 ms
[17]:
id name x y
timestamp
2000-01-05 00:00:00 1001 Hannah 0.85 -0.23
2000-01-05 00:00:01 1021 Charlie -0.09 -0.42
2000-01-05 00:00:02 974 Zelda 0.70 -0.81
2000-01-05 00:00:03 1015 Sarah 0.35 -0.13
2000-01-05 00:00:04 989 Frank -0.26 -0.96
... ... ... ... ...
2000-01-05 23:59:55 1023 Alice 0.68 0.21
2000-01-05 23:59:56 982 Alice 0.90 0.74
2000-01-05 23:59:57 941 Sarah -0.49 -0.39
2000-01-05 23:59:58 1009 Kevin 0.89 -0.26
2000-01-05 23:59:59 1031 Hannah 0.80 0.53

86400 行 × 4 列

设置索引

数据按索引列排序。这可以加快访问、连接、groupby-apply 等操作。然而,并行排序数据可能会耗费成本,因此设置索引既重要,但不应频繁进行。在接下来的几个示例中,我们将按 name 列对数据进行分组,因此我们将该列设置为索引以提高效率。

[18]:
df5 = df.set_index("name")
df5

[18]:
Dask DataFrame 结构
id x y
npartitions=26
Alice int64 float64 float64
Bob ... ... ...
... ... ... ...
Zelda ... ... ...
Zelda ... ... ...
Dask 名称:sort_index, 954 tasks

由于重置此数据集的索引成本较高,并且我们可以将其放入可用 RAM 中,因此我们将数据集持久化到内存。

[19]:
df5 = df5.persist()
df5

[19]:
Dask DataFrame 结构
id x y
npartitions=26
Alice int64 float64 float64
Bob ... ... ...
... ... ... ...
Zelda ... ... ...
Zelda ... ... ...
Dask 名称:sort_index, 26 tasks

Dask 现在知道所有数据存储在哪里,并按名称索引。因此,随机访问等操作变得廉价且高效。

[20]:
%time df5.loc['Alice'].compute()
CPU times: user 360 ms, sys: 44.7 ms, total: 404 ms
Wall time: 2.35 s
[20]:
id x y
name
Alice 974 -0.04 0.25
Alice 1001 0.44 0.55
Alice 1039 -0.38 0.88
Alice 974 -0.71 0.12
Alice 960 0.98 -0.21
... ... ... ...
Alice 994 0.56 0.90
Alice 999 0.03 -0.11
Alice 988 -0.49 -0.80
Alice 999 -0.35 -0.20
Alice 1079 -0.29 0.88

99801 行 × 3 列

结合 Scikit-Learn 使用 Groupby Apply

现在我们的数据已按名称排序,我们可以廉价地进行按名称随机访问或使用自定义函数进行 groupby-apply 等操作。

这里我们在每个名称上训练一个不同的 scikit-learn 线性回归模型。

[21]:
from sklearn.linear_model import LinearRegression


def train(partition):
    if not len(partition):
        return
    est = LinearRegression()
    est.fit(partition[["x"]].values, partition.y.values)
    return est

train() 函数的 partition 参数将是 DataFrameGroupBy 中的一个分组实例。如果分区中没有数据,则无需继续。如果存在数据,我们将拟合线性回归模型并将其作为此组的值返回。

现在处理 df5,其索引是来自 df 的名称,我们可以按 names 列进行分组。这恰好也是索引,但这没关系。然后我们使用 .apply().groupby() 生成的 DataFrameGroupBy 中的每个组运行 train()

meta 参数告诉 Dask 如何创建用于保存 .apply() 结果的 DataFrameSeries。在本例中,train() 返回一个单一值,因此 .apply() 将创建一个 Series。这意味着我们需要告诉 Dask 该单一列的类型,并可以选择为其命名。

指定单列的最简单方法是使用元组。列名是元组的第一个元素。由于这是一系列线性回归结果,我们将列命名为 "LinearRegression"。元组的第二个元素是 train 返回值的类型。在这种情况下,Pandas 将结果存储为一般 object 类型,这就是我们应该传递的类型。

[22]:
df6 = df5.groupby("name").apply(
    train, meta=("LinearRegression", object)
).compute()
df6
[22]:
name
Alice      LinearRegression()
Bob        LinearRegression()
Charlie    LinearRegression()
Dan        LinearRegression()
Edith      LinearRegression()
                  ...
Victor     LinearRegression()
Wendy      LinearRegression()
Xavier     LinearRegression()
Yvonne     LinearRegression()
Zelda      LinearRegression()
Name: LinearRegression, Length: 26, dtype: object

延伸阅读

有关 Dask dataframes 更深入的介绍,请参阅 dask 教程 的 notebooks 04 和 07。