DataFrames:读写数据
目录
在线 Notebook
您可以在 在线会话 中运行此 notebook 或在 Github 上查看。
DataFrames:读写数据¶
Dask DataFrames 可以以与 Pandas DataFrames 相同的许多格式读写数据。在此示例中,我们使用流行的 CSV 和 Parquet 格式读写数据,并讨论使用这些格式时的最佳实践。
[1]:
from IPython.display import YouTubeVideo
YouTubeVideo("0eEsIA0O1iE")
[1]:
启动 Dask 客户端用于仪表盘¶
启动 Dask 客户端是可选的。它将提供一个仪表盘,有助于了解计算情况。
创建下面的客户端后,仪表盘的链接将可见。我们建议在屏幕的一侧打开它,而在另一侧使用 notebook。这可能需要一些精力来调整窗口,但同时看到它们对于学习非常有用。
[2]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client
[2]:
客户端
Client-e20d2897-0de0-11ed-a12a-000d3a8f7959
连接方法: 集群对象 | 集群类型: distributed.LocalCluster |
仪表盘: http://127.0.0.1:8787/status |
集群信息
LocalCluster
6de9824d
仪表盘: http://127.0.0.1:8787/status | 工作节点 1 |
总线程数 4 | 总内存: 1.86 GiB |
状态: 运行中 | 使用进程: True |
调度器信息
调度器
Scheduler-f935f86a-49c7-4bf7-a1cf-50e7d7648a8e
通信: tcp://127.0.0.1:46871 | 工作节点 1 |
仪表盘: http://127.0.0.1:8787/status | 总线程数 4 |
启动时间: 刚刚 | 总内存: 1.86 GiB |
工作节点
工作节点:0
通信: tcp://127.0.0.1:42199 | 总线程数 4 |
仪表盘: http://127.0.0.1:44125/status | 内存: 1.86 GiB |
Nanny: tcp://127.0.0.1:45137 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-n5409ak8 |
创建人工数据集¶
首先,我们创建一个人工数据集,并将其写入多个 CSV 文件。
您无需理解本节,我们只是为 notebook 的其余部分创建数据集。
[3]:
import dask
df = dask.datasets.timeseries()
df
[3]:
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | int64 | object | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
[4]:
import os
import datetime
if not os.path.exists('data'):
os.mkdir('data')
def name(i):
""" Provide date for filename given index
Examples
--------
>>> name(0)
'2000-01-01'
>>> name(10)
'2000-01-11'
"""
return str(datetime.date(2000, 1, 1) + i * datetime.timedelta(days=1))
df.to_csv('data/*.csv', name_function=name);
读取 CSV 文件¶
现在我们的数据目录中有许多 CSV 文件,每个文件对应 2000 年 1 月份的一天。每个 CSV 文件包含那一天的时序数据。我们可以使用带有 glob 字符串的 dd.read_csv
函数将所有这些文件作为一个逻辑 dataframe 读取。
[5]:
!ls data/*.csv | head
data/2000-01-01.csv
data/2000-01-02.csv
data/2000-01-03.csv
data/2000-01-04.csv
data/2000-01-05.csv
data/2000-01-06.csv
data/2000-01-07.csv
data/2000-01-08.csv
data/2000-01-09.csv
data/2000-01-10.csv
[6]:
!head data/2000-01-01.csv
timestamp,id,name,x,y
2000-01-01 00:00:00,1009,Jerry,0.9005427499558429,0.3212344670325944
2000-01-01 00:00:01,940,Quinn,0.46795036754868247,-0.01884571513893385
2000-01-01 00:00:02,1017,Ingrid,0.9442706585905265,-0.9229268785155369
2000-01-01 00:00:03,1034,Tim,0.010273653581192255,-0.2850042344432575
2000-01-01 00:00:04,963,Bob,-0.9556052127604173,-0.409805293606079
2000-01-01 00:00:05,992,Ray,0.49090905386189876,-0.8364030355424359
2000-01-01 00:00:06,999,Ray,-0.1791414361782142,0.9108295350480047
2000-01-01 00:00:07,1017,Tim,-0.6121437272121055,0.5585754365941122
2000-01-01 00:00:08,1037,Dan,-0.6931099564135064,-0.6357258139372404
[7]:
!head data/2000-01-30.csv
timestamp,id,name,x,y
2000-01-30 00:00:00,1067,Quinn,-0.9275010814781244,0.7051035850972305
2000-01-30 00:00:01,1011,Quinn,-0.8288674460103511,-0.3018417020358921
2000-01-30 00:00:02,933,Laura,-0.5165326137868189,0.9195088929096915
2000-01-30 00:00:03,1040,Ray,0.8073954879070395,0.9243639047927026
2000-01-30 00:00:04,963,Wendy,0.791167365074305,0.2941664104084778
2000-01-30 00:00:05,1008,Bob,0.38959445411393334,-0.32793662786416844
2000-01-30 00:00:06,1008,Ray,-0.2127878456673038,0.040117377007003796
2000-01-30 00:00:07,1038,Ingrid,0.3092567914432629,0.11665005655447458
2000-01-30 00:00:08,985,Hannah,-0.42749597352375934,-0.3888014211219375
我们可以使用 pandas.read_csv
读取单个文件,或使用 dask.dataframe.read_csv
读取多个文件。
[8]:
import pandas as pd
df = pd.read_csv('data/2000-01-01.csv')
df.head()
[8]:
timestamp | id | name | x | y | |
---|---|---|---|---|---|
0 | 2000-01-01 00:00:00 | 1009 | Jerry | 0.900543 | 0.321234 |
1 | 2000-01-01 00:00:01 | 940 | Quinn | 0.467950 | -0.018846 |
2 | 2000-01-01 00:00:02 | 1017 | Ingrid | 0.944271 | -0.922927 |
3 | 2000-01-01 00:00:03 | 1034 | Tim | 0.010274 | -0.285004 |
4 | 2000-01-01 00:00:04 | 963 | Bob | -0.955605 | -0.409805 |
[9]:
import dask.dataframe as dd
df = dd.read_csv('data/2000-*-*.csv')
df
[9]:
timestamp | id | name | x | y | |
---|---|---|---|---|---|
npartitions=30 | |||||
object | int64 | object | float64 | float64 | |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... |
[10]:
df.head()
[10]:
timestamp | id | name | x | y | |
---|---|---|---|---|---|
0 | 2000-01-01 00:00:00 | 1009 | Jerry | 0.900543 | 0.321234 |
1 | 2000-01-01 00:00:01 | 940 | Quinn | 0.467950 | -0.018846 |
2 | 2000-01-01 00:00:02 | 1017 | Ingrid | 0.944271 | -0.922927 |
3 | 2000-01-01 00:00:03 | 1034 | Tim | 0.010274 | -0.285004 |
4 | 2000-01-01 00:00:04 | 963 | Bob | -0.955605 | -0.409805 |
调整 read_csv¶
Pandas 的 read_csv
函数有许多选项可帮助您解析文件。Dask 版本内部使用 Pandas 函数,因此支持许多相同的选项。您可以使用 ?
运算符查看完整的文档字符串。
[11]:
pd.read_csv?
[12]:
dd.read_csv?
在这种情况下,我们使用 parse_dates
关键字将 timestamp 列解析为 datetime 类型。这将使未来操作更高效。请注意,timestamp 列的 dtype 已从 object
更改为 datetime64[ns]
。
[13]:
df = dd.read_csv('data/2000-*-*.csv', parse_dates=['timestamp'])
df
[13]:
timestamp | id | name | x | y | |
---|---|---|---|---|---|
npartitions=30 | |||||
datetime64[ns] | int64 | object | float64 | float64 | |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... |
执行简单计算¶
每当我们对 dataframe 进行操作时,我们都会读取所有的 CSV 数据,这样就不会耗尽 RAM。这对于内存使用非常高效,但每次都读取所有 CSV 文件可能会很慢。
[14]:
%time df.groupby('name').x.mean().compute()
CPU times: user 211 ms, sys: 20.6 ms, total: 232 ms
Wall time: 3.26 s
[14]:
name
Alice 0.004810
Bob -0.000236
Charlie -0.003038
Dan 0.002005
Edith -0.001287
Frank 0.000691
George -0.002461
Hannah -0.004205
Ingrid 0.001781
Jerry -0.000149
Kevin 0.000707
Laura 0.002090
Michael -0.004071
Norbert -0.001131
Oliver -0.002930
Patricia 0.000120
Quinn 0.000870
Ray 0.000424
Sarah -0.000817
Tim 0.003061
Ursula 0.002109
Victor -0.001035
Wendy -0.002654
Xavier 0.000702
Yvonne 0.000308
Zelda -0.001066
Name: x, dtype: float64
[ ]:
写入 Parquet¶
相反,我们将数据存储在 Parquet 中,这是一种更高效的计算机读写格式。
[15]:
df.to_parquet('data/2000-01.parquet', engine='pyarrow')
[16]:
!ls data/2000-01.parquet/
part.0.parquet part.16.parquet part.23.parquet part.4.parquet
part.1.parquet part.17.parquet part.24.parquet part.5.parquet
part.10.parquet part.18.parquet part.25.parquet part.6.parquet
part.11.parquet part.19.parquet part.26.parquet part.7.parquet
part.12.parquet part.2.parquet part.27.parquet part.8.parquet
part.13.parquet part.20.parquet part.28.parquet part.9.parquet
part.14.parquet part.21.parquet part.29.parquet
part.15.parquet part.22.parquet part.3.parquet
从 Parquet 读取¶
[17]:
df = dd.read_parquet('data/2000-01.parquet', engine='pyarrow')
df
[17]:
timestamp | id | name | x | y | |
---|---|---|---|---|---|
npartitions=30 | |||||
datetime64[ns] | int64 | object | float64 | float64 | |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... |
[18]:
%time df.groupby('name').x.mean().compute()
CPU times: user 132 ms, sys: 13.1 ms, total: 145 ms
Wall time: 942 ms
[18]:
name
Alice 0.004810
Bob -0.000236
Charlie -0.003038
Dan 0.002005
Edith -0.001287
Frank 0.000691
George -0.002461
Hannah -0.004205
Ingrid 0.001781
Jerry -0.000149
Kevin 0.000707
Laura 0.002090
Michael -0.004071
Norbert -0.001131
Oliver -0.002930
Patricia 0.000120
Quinn 0.000870
Ray 0.000424
Sarah -0.000817
Tim 0.003061
Ursula 0.002109
Victor -0.001035
Wendy -0.002654
Xavier 0.000702
Yvonne 0.000308
Zelda -0.001066
Name: x, dtype: float64
仅选择您计划使用的列¶
Parquet 是一种列式存储格式,这意味着它可以高效地仅从您的数据集中提取少量列。这很有用,因为它有助于避免不必要的数据加载。
[19]:
%%time
df = dd.read_parquet('data/2000-01.parquet', columns=['name', 'x'], engine='pyarrow')
df.groupby('name').x.mean().compute()
CPU times: user 130 ms, sys: 6.46 ms, total: 136 ms
Wall time: 851 ms
[19]:
name
Alice 0.004810
Bob -0.000236
Charlie -0.003038
Dan 0.002005
Edith -0.001287
Frank 0.000691
George -0.002461
Hannah -0.004205
Ingrid 0.001781
Jerry -0.000149
Kevin 0.000707
Laura 0.002090
Michael -0.004071
Norbert -0.001131
Oliver -0.002930
Patricia 0.000120
Quinn 0.000870
Ray 0.000424
Sarah -0.000817
Tim 0.003061
Ursula 0.002109
Victor -0.001035
Wendy -0.002654
Xavier 0.000702
Yvonne 0.000308
Zelda -0.001066
Name: x, dtype: float64
这里的差异不是很大,但对于更大的数据集,这可以节省大量时间。