从 Pandas 到 Dask 的陷阱
目录
实时 Notebook
您可以在实时会话中运行此 notebook ,或在 Github 上查看它。
从 Pandas 到 Dask 的陷阱¶
Pandas
迁移到 Dask
环境中运行时的一些关键差异。[1]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask versoin: {dask.__version__}')
print(f'Pandas versoin: {pd.__version__}')
Dask versoin: 2022.05.0
Pandas versoin: 1.4.2
启动 Dask Client 以访问仪表盘¶
LocalCluster
上运行,这将提供一个仪表盘,有助于深入了解计算过程。Jupyter Lab
中运行时,可以安装一个扩展来查看各种仪表盘小部件。[2]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client
[2]:
客户端
Client-03e84462-0de1-11ed-a1e8-000d3a8f7959
连接方法: Cluster object | 集群类型: distributed.LocalCluster |
仪表盘: http://127.0.0.1:8787/status |
集群信息
LocalCluster
45bdf0f8
仪表盘: http://127.0.0.1:8787/status | 工作节点 2 |
总线程数 2 | 总内存: 6.78 GiB |
状态: 运行中 | 使用进程: True |
调度器信息
调度器
Scheduler-1dfabd40-56f7-4aad-a87c-63bd33674848
通信: tcp://127.0.0.1:36429 | 工作节点 2 |
仪表盘: http://127.0.0.1:8787/status | 总线程数 2 |
启动时间: 刚刚 | 总内存: 6.78 GiB |
工作节点
工作节点:0
通信: tcp://127.0.0.1:39049 | 总线程数 1 |
仪表盘: http://127.0.0.1:37891/status | 内存: 3.39 GiB |
Nanny: tcp://127.0.0.1:41889 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-t0q30i9q |
工作节点:1
通信: tcp://127.0.0.1:36835 | 总线程数 1 |
仪表盘: http://127.0.0.1:36827/status | 内存: 3.39 GiB |
Nanny: tcp://127.0.0.1:43323 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-uo70pumh |
创建两个 DataFrame 进行比较:¶
对于 Dask
对于 Pandas Dask 自带内置数据集样本,我们将使用此样本进行示例。
[3]:
ddf = dask.datasets.timeseries()
ddf
[3]:
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | int64 | object | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
请记住
Dask framework
是惰性的,因此要查看结果,我们需要运行 compute()(或head()
,它在底层运行 compute()) )
[4]:
ddf.head(2)
[4]:
id | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 |
为了创建 Pandas
dataframe,我们可以使用 Dask dataframe
的 compute()
方法
[5]:
pdf = ddf.compute()
print(type(pdf))
pdf.head(2)
<class 'pandas.core.frame.DataFrame'>
[5]:
id | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 |
在使用 shape 属性时,我们也可以看到dask 的惰性
[6]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')
Pandas shape: (2592000, 4)
---------------------------
Dask lazy shape: (Delayed('int-cfb1b2b5-09dd-494e-b2d2-f875ace2562d'), 4)
在访问所有分区之前,我们无法获得完整的 shape - 运行 len
将会这样做
[7]:
print(f'Dask computed shape: {len(ddf.index):,}') # expensive
Dask computed shape: 2,592,000
从 Pandas
创建 Dask dataframe
¶
为了在现有的 Pandas dataframe
(pdf) 上利用 Dask
的能力,我们需要使用 from_pandas 方法将 Pandas dataframe
转换为 Dask dataframe
(ddf)。您必须提供用于生成 dask dataframe 的分区数或 chunksize
[8]:
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2
[8]:
id | name | x | y | |
---|---|---|---|---|
npartitions=10 | ||||
2000-01-01 00:00:00 | int64 | object | float64 | float64 |
2000-01-04 00:00:00 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-28 00:00:00 | ... | ... | ... | ... |
2000-01-30 23:59:59 | ... | ... | ... | ... |
Dask Dataframes 中的分区¶
Dask dataframe
时,需要提供 npartitions
参数。Dask
如何分解 Pandas Datafram
并并行化计算。检查 reset_ index()
方法时可以看到这方面的一个示例
[9]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]
[9]:
timestamp 2000-01-01 00:00:00
id 983
name Wendy
x -0.303374
y -0.423744
Name: 0, dtype: object
[10]:
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute()
[10]:
timestamp | id | name | x | y | |
---|---|---|---|---|---|
0 | 2000-01-01 | 983 | Wendy | -0.303374 | -0.423744 |
0 | 2000-01-04 | 1002 | Kevin | -0.825578 | -0.584699 |
0 | 2000-01-07 | 963 | Oliver | 0.024036 | -0.692546 |
0 | 2000-01-10 | 1023 | Yvonne | 0.897486 | 0.958034 |
0 | 2000-01-13 | 1088 | Quinn | -0.721954 | 0.261693 |
0 | 2000-01-16 | 994 | George | 0.463023 | -0.166976 |
0 | 2000-01-19 | 932 | Frank | 0.272315 | -0.585240 |
0 | 2000-01-22 | 1007 | Ursula | -0.919138 | -0.173157 |
0 | 2000-01-25 | 983 | Patricia | -0.893431 | -0.892484 |
0 | 2000-01-28 | 1043 | Oliver | -0.979336 | -0.581927 |
Dask Dataframe 与 Pandas Dataframe¶
现在我们有了 dask
(ddf) 和 pandas
(pdf) dataframe,我们可以开始比较它们之间的交互。
概念转变 - 从更新到插入/删除¶
inplace=True
这样的参数。重命名列¶
使用
inplace=True
不被认为是最佳实践。
[11]:
# Pandas
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[11]:
Index(['ID', 'name', 'x', 'y'], dtype='object')
[12]:
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[12]:
Index(['ID', 'name', 'x', 'y'], dtype='object')
数据操作¶
操作数据时存在一些差异。
loc - Pandas¶
[13]:
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)
[13]:
ID | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:06 | 1019 | Xavier | 0.634802 | 45.051214 |
2000-01-01 00:00:08 | 1013 | Charlie | 0.627523 | -8.101142 |
错误¶
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [14], in <cell line: 2>()
1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
3 ddf[cond_ddf].head(2)
TypeError: '_LocIndexer' object does not support item assignment
Dask - 使用 mask/where¶
[14]:
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)
[14]:
ID | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 |
[15]:
#Dask
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)
[15]:
ID | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 |
更多信息请参阅dask mask 文档
Meta 参数¶
Dask
为计算创建了一个 DAG,它需要了解每个计算阶段的输出是什么。[16]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)
[16]:
ID | name | x | y | initials | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je |
[17]:
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:3946: UserWarning:
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
Before: .apply(func)
After: .apply(func, meta=('name', 'object'))
warnings.warn(meta_warning(meta))
[17]:
ID | name | x | y | initials | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je |
[18]:
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')
[19]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)
ddf.head(2)
[19]:
ID | name | x | y | initials | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je |
[20]:
# similar when using a function
def func(row):
if (row['x']> 0):
return row['x'] * 1000
else:
return row['y'] * -1
[21]:
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)
[21]:
ID | name | x | y | initials | z | |
---|---|---|---|---|---|---|
timestamp | ||||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We | 0.423744 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je | 21.914646 |
Map partitions¶
我们可以使用 map_partitions 方法提供一个临时函数来在每个分区上运行。主要用于
Dask
或Pandas
中未实现的函数。最后,我们可以返回一个新的
dataframe
,需要在meta
参数中对其进行描述。该函数也可以包含参数。
[22]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
df['dist'] = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2
+ (df[coor_y] - df[coor_y].shift())**2 )
return df.drop(drop_cols, axis=1)
ddf2 = ddf.map_partitions(func2
, coor_x='x'
, coor_y='y'
, drop_cols=['initials', 'z']
, meta=pd.DataFrame({'ID':'i8'
, 'name':str
, 'x':'f8'
, 'y':'f8'
, 'dist':'f8'}, index=[0]))
ddf2.head()
[22]:
ID | name | x | y | dist | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | NaN |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | 1.063636 |
2000-01-01 00:00:02 | 996 | Kevin | 0.336184 | 0.150478 | 0.539449 |
2000-01-01 00:00:03 | 1035 | Quinn | 0.853655 | 0.031222 | 0.531035 |
2000-01-01 00:00:04 | 1039 | Ingrid | 0.890711 | -0.992794 | 1.024686 |
将索引转换为时间列¶
[23]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)
[23]:
ID | name | x | y | initials | times | |
---|---|---|---|---|---|---|
timestamp | ||||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We | 00:00:00 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je | 00:00:01 |
[24]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)
[24]:
ID | name | x | y | initials | z | times | |
---|---|---|---|---|---|---|---|
timestamp | |||||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We | 0.423744 | 00:00:00 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je | 21.914646 | 00:00:01 |
在列上删除 NA¶
[25]:
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1)
[26]:
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)
[27]:
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)
[27]:
ID | name | x | y | times | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | 00:00:00 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | 00:00:01 |
为了让 Dask
删除一个包含所有 na
的列,它必须使用 compute()
检查所有分区
[28]:
if ddf.colna.isnull().all().compute() == True: # check if all values in column are Null - expensive
ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)
[28]:
ID | name | x | y | times | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | 00:00:00 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | 00:00:01 |
1.4 重置索引¶
[29]:
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)
[29]:
ID | name | x | y | times | |
---|---|---|---|---|---|
0 | 983 | Wendy | -0.303374 | -0.423744 | 00:00:00 |
1 | 964 | Jerry | 0.021915 | 0.588930 | 00:00:01 |
[30]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)
[30]:
ID | name | x | y | times | |
---|---|---|---|---|---|
0 | 983 | Wendy | -0.303374 | -0.423744 | 00:00:00 |
1 | 964 | Jerry | 0.021915 | 0.588930 | 00:00:01 |
读取 / 保存文件¶
使用
pandas
和dask
时,首选使用 parquet 格式。使用
Dask
时 - 可以使用多个工作节点读取文件。大多数
kwargs
适用于读写文件,例如 ddf = dd.read_csv(’data/pd2dd/ddf*.csv’, compression=‘gzip’, header=False)。但是,有些参数不可用,例如
nrows
。
请参阅文档(包括输出文件命名选项)。
保存文件¶
[31]:
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)
[32]:
%%time
# Pandas
pdf.to_csv(output_dir_file)
CPU times: user 16.4 s, sys: 433 ms, total: 16.8 s
Wall time: 16.6 s
[33]:
list(output_dir_file.parent.glob('*.csv'))
[33]:
[PosixPath('data/2000-01-25.csv'),
PosixPath('data/2000-01-20.csv'),
PosixPath('data/2000-01-29.csv'),
PosixPath('data/2000-01-02.csv'),
PosixPath('data/2000-01-19.csv'),
PosixPath('data/2000-01-23.csv'),
PosixPath('data/2000-01-10.csv'),
PosixPath('data/2000-01-21.csv'),
PosixPath('data/2000-01-17.csv'),
PosixPath('data/2000-01-04.csv'),
PosixPath('data/2000-01-27.csv'),
PosixPath('data/2000-01-22.csv'),
PosixPath('data/2000-01-14.csv'),
PosixPath('data/2000-01-11.csv'),
PosixPath('data/pdf_single_file.csv'),
PosixPath('data/2000-01-13.csv'),
PosixPath('data/2000-01-08.csv'),
PosixPath('data/2000-01-09.csv'),
PosixPath('data/2000-01-06.csv'),
PosixPath('data/2000-01-01.csv'),
PosixPath('data/2000-01-07.csv'),
PosixPath('data/2000-01-12.csv'),
PosixPath('data/2000-01-16.csv'),
PosixPath('data/2000-01-26.csv'),
PosixPath('data/2000-01-24.csv'),
PosixPath('data/2000-01-18.csv'),
PosixPath('data/2000-01-15.csv'),
PosixPath('data/2000-01-03.csv'),
PosixPath('data/2000-01-30.csv'),
PosixPath('data/2000-01-28.csv'),
PosixPath('data/2000-01-05.csv')]
注意 '*'
允许重命名多个文件。
[34]:
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)
[35]:
%%time
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)
CPU times: user 454 ms, sys: 46.5 ms, total: 500 ms
Wall time: 10.4 s
[35]:
['/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf00.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf01.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf02.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf03.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf04.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf05.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf06.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf07.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf08.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf09.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf10.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf11.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf12.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf13.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf14.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf15.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf16.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf17.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf18.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf19.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf20.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf21.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf22.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf23.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf24.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf25.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf26.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf27.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf28.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf29.csv']
要查找将确定输出文件数量的分区数,请使用 dask.dataframe.npartitions
要更改输出文件数,请使用 repartition,这是一项昂贵的操作。
[36]:
ddf.npartitions
[36]:
30
读取多个文件¶
对于 pandas
,可以迭代并连接文件,请参阅 stack overflow 回答。
[37]:
%%time
# Pandas
concat_df = pd.concat([pd.read_csv(f)
for f in list(output_dask_dir.iterdir())])
len(concat_df)
CPU times: user 2.75 s, sys: 318 ms, total: 3.07 s
Wall time: 3 s
[37]:
2592000
[38]:
%%time
# Dask
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
_ddf
CPU times: user 11.9 ms, sys: 0 ns, total: 11.9 ms
Wall time: 12.4 ms
[38]:
ID | name | x | y | times | |
---|---|---|---|---|---|
npartitions=30 | |||||
int64 | object | float64 | float64 | object | |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... |
请记住 Dask
是惰性的 - 因此它不会真正读取文件,直到需要时才读取…
[39]:
%%time
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
len(_ddf)
CPU times: user 69.6 ms, sys: 11.3 ms, total: 81 ms
Wall time: 818 ms
[39]:
2592000
[40]:
# e.g.
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)
[40]:
ID | name | x | y | times | |
---|---|---|---|---|---|
0 | 983 | Wendy | -0.303374 | -0.423744 | 00:00:00 |
1 | 964 | Jerry | 0.021915 | 0.588930 | 00:00:01 |
Group By - 自定义聚合¶
[41]:
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()
[41]:
name | ID | seconds | |
---|---|---|---|
0 | Wendy | 983 | 00 |
1 | Jerry | 964 | 00 |
2 | Kevin | 996 | 00 |
3 | Quinn | 1035 | 00 |
4 | Ingrid | 1039 | 00 |
[42]:
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
(lambda x: list(set(x.to_list())))
for att_col_gr in gp_col]
[43]:
%%time
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
print(df_edge_att.head(2))
Weight ID \
name
Alice 99833 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...
Bob 99508 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...
seconds
name
Alice [32, 05, 12, 42, 69, 34, 23, 24, 60, 72, 98, 6...
Bob [32, 05, 12, 42, 69, 34, 23, 24, 60, 72, 98, 6...
CPU times: user 23.1 ms, sys: 169 µs, total: 23.3 ms
Wall time: 22.1 ms
请记住,在某些情况下,
Pandas
更高效(假设您可以将所有数据加载到 RAM 中)。
[44]:
def set_list_att(x: dd.Series):
return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)
[44]:
name | ID | seconds | |
---|---|---|---|
0 | Wendy | 983 | 00 |
1 | Jerry | 964 | 01 |
[45]:
ddf.columns
[45]:
Index(['ID', 'name', 'x', 'y', 'times', 'seconds'], dtype='object')
[46]:
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
for att_col_gr in gp_col]
[47]:
%%time
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
File <timed exec>:4, in <module>
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1213, in _Frame.head(self, n, npartitions, compute)
1211 # No need to warn if we're already looking at all partitions
1212 safe = npartitions != self.npartitions
-> 1213 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1247, in _Frame._head(self, n, npartitions, compute, safe)
1242 result = new_dd_object(
1243 graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
1244 )
1246 if compute:
-> 1247 result = result.compute()
1248 return result
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292, in DaskMethodsMixin.compute(self, **kwargs)
268 def compute(self, **kwargs):
269 """Compute this dask collection
270
271 This turns a lazy Dask collection into its in-memory equivalent.
(...)
290 dask.base.compute
291 """
--> 292 (result,) = compute(self, traverse=False, **kwargs)
293 return result
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
572 keys.append(x.__dask_keys__())
573 postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3004, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3002 should_rejoin = False
3003 try:
-> 3004 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3005 finally:
3006 for f in futures.values():
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2178, in Client.gather(self, futures, errors, direct, asynchronous)
2176 else:
2177 local_worker = None
-> 2178 return self.sync(
2179 self._gather,
2180 futures,
2181 errors=errors,
2182 direct=direct,
2183 local_worker=local_worker,
2184 asynchronous=asynchronous,
2185 )
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:318, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
316 return future
317 else:
--> 318 return sync(
319 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
320 )
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:385, in sync(loop, func, callback_timeout, *args, **kwargs)
383 if error:
384 typ, exc, tb = error
--> 385 raise exc.with_traceback(tb)
386 else:
387 return result
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:358, in sync.<locals>.f()
356 future = asyncio.wait_for(future, callback_timeout)
357 future = asyncio.ensure_future(future)
--> 358 result = yield future
359 except Exception:
360 error = sys.exc_info()
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
759 exc_info = None
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2041, in Client._gather(self, futures, errors, direct, local_worker)
2039 exc = CancelledError(key)
2040 else:
-> 2041 raise exception.with_traceback(traceback)
2042 raise exc
2043 if errors == "skip":
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990, in __call__()
988 if not len(args) == len(self.inkeys):
989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149, in get()
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in _execute_task()
83 """Do the actual work of collecting data and executing a function
84
85 Examples
(...)
110 'foo'
111 """
112 if isinstance(arg, list):
--> 113 return [_execute_task(a, cache) for a in arg]
114 elif istask(arg):
115 func, args = arg[0], arg[1:]
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in <listcomp>()
83 """Do the actual work of collecting data and executing a function
84
85 Examples
(...)
110 'foo'
111 """
112 if isinstance(arg, list):
--> 113 return [_execute_task(a, cache) for a in arg]
114 elif istask(arg):
115 func, args = arg[0], arg[1:]
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:40, in apply()
38 def apply(func, args, kwargs=None):
39 if kwargs:
---> 40 return func(*args, **kwargs)
41 else:
42 return func(*args)
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6436, in apply_and_enforce()
6434 return meta
6435 if is_dataframe_like(df):
-> 6436 check_matching_columns(meta, df)
6437 c = meta.columns
6438 else:
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415, in check_matching_columns()
413 else:
414 extra_info = "Order of columns does not match"
--> 415 raise ValueError(
416 "The columns in the computed data do not match"
417 " the columns in the provided metadata\n"
418 f"{extra_info}"
419 )
ValueError: The columns in the computed data do not match the columns in the provided metadata
Extra: ['name']
Missing: [0]
[48]:
import itertools
custom_agg = dd.Aggregation(
'custom_agg',
lambda s: s.apply(set),
lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)
[49]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
CPU times: user 185 ms, sys: 11.3 ms, total: 196 ms
Wall time: 1.33 s
[49]:
Weight | ID | seconds | |
---|---|---|---|
name | |||
Alice | 99833 | [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... | [21, 28, 03, 20, 52, 02, 43, 38, 32, 49, 09, 0... |
Bob | 99508 | [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... | [28, 21, 03, 20, 02, 52, 43, 32, 38, 49, 09, 0... |
调试¶
调试可能具有挑战性… 1. 在没有客户端的情况下运行代码 2. 使用仪表盘分析器 3. 验证 DAG 的完整性
损坏的 DAG¶
在此示例中,我们展示了 DAG 损坏后可能需要重置计算。
[50]:
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)
[50]:
id | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 | 996 | Ingrid | -0.932092 | 0.477965 |
[51]:
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())^2 # `^` <-- wrong syntax
+ (df[coor_y] - df[coor_y].shift())^2 ) # `^` <-- wrong syntax
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
warnings.warn(
一切正常吗?
# 导致错误 ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last)即使函数被纠正,DAG 也已损坏
[52]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 # `**` <-- correct syntax
+ (df[coor_y] - df[coor_y].shift())**2 ) # `**` <-- correct syntax
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
warnings.warn(
我们需要重置 dataframe
[53]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 #corrected math function
+ (df[coor_y] - df[coor_y].shift())**2 )
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
warnings.warn(
[53]:
id | name | x | y | col | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 979 | Frank | 0.820777 | 0.098616 | NaN |
2000-01-01 00:00:01 | 990 | Laura | -0.851323 | -0.501678 | 1.77659 |