在线 Notebook

您可以在 在线会话 中运行此 notebook Binder在 Github 上查看

DataFrames:读写数据

Dask DataFrames 可以以与 Pandas DataFrames 相同的许多格式读写数据。在此示例中,我们使用流行的 CSV 和 Parquet 格式读写数据,并讨论使用这些格式时的最佳实践。

[1]:
from IPython.display import YouTubeVideo

YouTubeVideo("0eEsIA0O1iE")
[1]:

启动 Dask 客户端用于仪表盘

启动 Dask 客户端是可选的。它将提供一个仪表盘,有助于了解计算情况。

创建下面的客户端后,仪表盘的链接将可见。我们建议在屏幕的一侧打开它,而在另一侧使用 notebook。这可能需要一些精力来调整窗口,但同时看到它们对于学习非常有用。

[2]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client
[2]:

客户端

Client-e20d2897-0de0-11ed-a12a-000d3a8f7959

连接方法: 集群对象 集群类型: distributed.LocalCluster
仪表盘: http://127.0.0.1:8787/status

集群信息

创建人工数据集

首先,我们创建一个人工数据集,并将其写入多个 CSV 文件。

您无需理解本节,我们只是为 notebook 的其余部分创建数据集。

[3]:
import dask
df = dask.datasets.timeseries()
df
[3]:
Dask DataFrame 结构
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask 名称:make-timeseries, 30 tasks
[4]:
import os
import datetime

if not os.path.exists('data'):
    os.mkdir('data')

def name(i):
    """ Provide date for filename given index

    Examples
    --------
    >>> name(0)
    '2000-01-01'
    >>> name(10)
    '2000-01-11'
    """
    return str(datetime.date(2000, 1, 1) + i * datetime.timedelta(days=1))

df.to_csv('data/*.csv', name_function=name);

读取 CSV 文件

现在我们的数据目录中有许多 CSV 文件,每个文件对应 2000 年 1 月份的一天。每个 CSV 文件包含那一天的时序数据。我们可以使用带有 glob 字符串的 dd.read_csv 函数将所有这些文件作为一个逻辑 dataframe 读取。

[5]:
!ls data/*.csv | head
data/2000-01-01.csv
data/2000-01-02.csv
data/2000-01-03.csv
data/2000-01-04.csv
data/2000-01-05.csv
data/2000-01-06.csv
data/2000-01-07.csv
data/2000-01-08.csv
data/2000-01-09.csv
data/2000-01-10.csv
[6]:
!head data/2000-01-01.csv
timestamp,id,name,x,y
2000-01-01 00:00:00,1009,Jerry,0.9005427499558429,0.3212344670325944
2000-01-01 00:00:01,940,Quinn,0.46795036754868247,-0.01884571513893385
2000-01-01 00:00:02,1017,Ingrid,0.9442706585905265,-0.9229268785155369
2000-01-01 00:00:03,1034,Tim,0.010273653581192255,-0.2850042344432575
2000-01-01 00:00:04,963,Bob,-0.9556052127604173,-0.409805293606079
2000-01-01 00:00:05,992,Ray,0.49090905386189876,-0.8364030355424359
2000-01-01 00:00:06,999,Ray,-0.1791414361782142,0.9108295350480047
2000-01-01 00:00:07,1017,Tim,-0.6121437272121055,0.5585754365941122
2000-01-01 00:00:08,1037,Dan,-0.6931099564135064,-0.6357258139372404
[7]:
!head data/2000-01-30.csv
timestamp,id,name,x,y
2000-01-30 00:00:00,1067,Quinn,-0.9275010814781244,0.7051035850972305
2000-01-30 00:00:01,1011,Quinn,-0.8288674460103511,-0.3018417020358921
2000-01-30 00:00:02,933,Laura,-0.5165326137868189,0.9195088929096915
2000-01-30 00:00:03,1040,Ray,0.8073954879070395,0.9243639047927026
2000-01-30 00:00:04,963,Wendy,0.791167365074305,0.2941664104084778
2000-01-30 00:00:05,1008,Bob,0.38959445411393334,-0.32793662786416844
2000-01-30 00:00:06,1008,Ray,-0.2127878456673038,0.040117377007003796
2000-01-30 00:00:07,1038,Ingrid,0.3092567914432629,0.11665005655447458
2000-01-30 00:00:08,985,Hannah,-0.42749597352375934,-0.3888014211219375

我们可以使用 pandas.read_csv 读取单个文件,或使用 dask.dataframe.read_csv 读取多个文件。

[8]:
import pandas as pd

df = pd.read_csv('data/2000-01-01.csv')
df.head()
[8]:
timestamp id name x y
0 2000-01-01 00:00:00 1009 Jerry 0.900543 0.321234
1 2000-01-01 00:00:01 940 Quinn 0.467950 -0.018846
2 2000-01-01 00:00:02 1017 Ingrid 0.944271 -0.922927
3 2000-01-01 00:00:03 1034 Tim 0.010274 -0.285004
4 2000-01-01 00:00:04 963 Bob -0.955605 -0.409805
[9]:
import dask.dataframe as dd

df = dd.read_csv('data/2000-*-*.csv')
df
[9]:
Dask DataFrame 结构
timestamp id name x y
npartitions=30
object int64 object float64 float64
... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ...
... ... ... ... ...
Dask 名称:read-csv, 30 tasks
[10]:
df.head()
[10]:
timestamp id name x y
0 2000-01-01 00:00:00 1009 Jerry 0.900543 0.321234
1 2000-01-01 00:00:01 940 Quinn 0.467950 -0.018846
2 2000-01-01 00:00:02 1017 Ingrid 0.944271 -0.922927
3 2000-01-01 00:00:03 1034 Tim 0.010274 -0.285004
4 2000-01-01 00:00:04 963 Bob -0.955605 -0.409805

调整 read_csv

Pandas 的 read_csv 函数有许多选项可帮助您解析文件。Dask 版本内部使用 Pandas 函数,因此支持许多相同的选项。您可以使用 ? 运算符查看完整的文档字符串。

[11]:
pd.read_csv?
[12]:
dd.read_csv?

在这种情况下,我们使用 parse_dates 关键字将 timestamp 列解析为 datetime 类型。这将使未来操作更高效。请注意,timestamp 列的 dtype 已从 object 更改为 datetime64[ns]

[13]:
df = dd.read_csv('data/2000-*-*.csv', parse_dates=['timestamp'])
df
[13]:
Dask DataFrame 结构
timestamp id name x y
npartitions=30
datetime64[ns] int64 object float64 float64
... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ...
... ... ... ... ...
Dask 名称:read-csv, 30 tasks

执行简单计算

每当我们对 dataframe 进行操作时,我们都会读取所有的 CSV 数据,这样就不会耗尽 RAM。这对于内存使用非常高效,但每次都读取所有 CSV 文件可能会很慢。

[14]:
%time df.groupby('name').x.mean().compute()
CPU times: user 211 ms, sys: 20.6 ms, total: 232 ms
Wall time: 3.26 s
[14]:
name
Alice       0.004810
Bob        -0.000236
Charlie    -0.003038
Dan         0.002005
Edith      -0.001287
Frank       0.000691
George     -0.002461
Hannah     -0.004205
Ingrid      0.001781
Jerry      -0.000149
Kevin       0.000707
Laura       0.002090
Michael    -0.004071
Norbert    -0.001131
Oliver     -0.002930
Patricia    0.000120
Quinn       0.000870
Ray         0.000424
Sarah      -0.000817
Tim         0.003061
Ursula      0.002109
Victor     -0.001035
Wendy      -0.002654
Xavier      0.000702
Yvonne      0.000308
Zelda      -0.001066
Name: x, dtype: float64
[ ]:

写入 Parquet

相反,我们将数据存储在 Parquet 中,这是一种更高效的计算机读写格式。

[15]:
df.to_parquet('data/2000-01.parquet', engine='pyarrow')
[16]:
!ls data/2000-01.parquet/
part.0.parquet   part.16.parquet  part.23.parquet  part.4.parquet
part.1.parquet   part.17.parquet  part.24.parquet  part.5.parquet
part.10.parquet  part.18.parquet  part.25.parquet  part.6.parquet
part.11.parquet  part.19.parquet  part.26.parquet  part.7.parquet
part.12.parquet  part.2.parquet   part.27.parquet  part.8.parquet
part.13.parquet  part.20.parquet  part.28.parquet  part.9.parquet
part.14.parquet  part.21.parquet  part.29.parquet
part.15.parquet  part.22.parquet  part.3.parquet

从 Parquet 读取

[17]:
df = dd.read_parquet('data/2000-01.parquet', engine='pyarrow')
df
[17]:
Dask DataFrame 结构
timestamp id name x y
npartitions=30
datetime64[ns] int64 object float64 float64
... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ...
... ... ... ... ...
Dask 名称:read-parquet, 30 tasks
[18]:
%time df.groupby('name').x.mean().compute()
CPU times: user 132 ms, sys: 13.1 ms, total: 145 ms
Wall time: 942 ms
[18]:
name
Alice       0.004810
Bob        -0.000236
Charlie    -0.003038
Dan         0.002005
Edith      -0.001287
Frank       0.000691
George     -0.002461
Hannah     -0.004205
Ingrid      0.001781
Jerry      -0.000149
Kevin       0.000707
Laura       0.002090
Michael    -0.004071
Norbert    -0.001131
Oliver     -0.002930
Patricia    0.000120
Quinn       0.000870
Ray         0.000424
Sarah      -0.000817
Tim         0.003061
Ursula      0.002109
Victor     -0.001035
Wendy      -0.002654
Xavier      0.000702
Yvonne      0.000308
Zelda      -0.001066
Name: x, dtype: float64

仅选择您计划使用的列

Parquet 是一种列式存储格式,这意味着它可以高效地仅从您的数据集中提取少量列。这很有用,因为它有助于避免不必要的数据加载。

[19]:
%%time
df = dd.read_parquet('data/2000-01.parquet', columns=['name', 'x'], engine='pyarrow')
df.groupby('name').x.mean().compute()
CPU times: user 130 ms, sys: 6.46 ms, total: 136 ms
Wall time: 851 ms
[19]:
name
Alice       0.004810
Bob        -0.000236
Charlie    -0.003038
Dan         0.002005
Edith      -0.001287
Frank       0.000691
George     -0.002461
Hannah     -0.004205
Ingrid      0.001781
Jerry      -0.000149
Kevin       0.000707
Laura       0.002090
Michael    -0.004071
Norbert    -0.001131
Oliver     -0.002930
Patricia    0.000120
Quinn       0.000870
Ray         0.000424
Sarah      -0.000817
Tim         0.003061
Ursula      0.002109
Victor     -0.001035
Wendy      -0.002654
Xavier      0.000702
Yvonne      0.000308
Zelda      -0.001066
Name: x, dtype: float64

这里的差异不是很大,但对于更大的数据集,这可以节省大量时间。