实时 Notebook

您可以在实时会话中运行此 notebook Binder,或在 Github 上查看它。

从 Pandas 到 Dask 的陷阱

本 notebook 重点介绍将代码从 Pandas 迁移到 Dask 环境中运行时的一些关键差异。
大多数问题都提供了指向 Dask 文档的链接,以获取更多信息。
[1]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask versoin: {dask.__version__}')
print(f'Pandas versoin: {pd.__version__}')
Dask versoin: 2022.05.0
Pandas versoin: 1.4.2

启动 Dask Client 以访问仪表盘

启动 Dask Client 是可选的。在此示例中,我们在 LocalCluster 上运行,这将提供一个仪表盘,有助于深入了解计算过程。
创建客户端后(如下图所示),仪表盘链接将可见。
Jupyter Lab 中运行时,可以安装一个扩展来查看各种仪表盘小部件。
[2]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client
[2]:

客户端

Client-03e84462-0de1-11ed-a1e8-000d3a8f7959

连接方法: Cluster object 集群类型: distributed.LocalCluster
仪表盘: http://127.0.0.1:8787/status

集群信息

有关其他集群配置,请参阅文档

创建两个 DataFrame 进行比较:

  1. 对于 Dask

  2. 对于 Pandas Dask 自带内置数据集样本,我们将使用此样本进行示例。

[3]:
ddf = dask.datasets.timeseries()
ddf
[3]:
Dask DataFrame 结构
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask 名称:make-timeseries, 30 tasks
  • 请记住 Dask framework惰性的,因此要查看结果,我们需要运行 compute()(或 head(),它在底层运行 compute()) )

[4]:
ddf.head(2)
[4]:
id name x y
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930

为了创建 Pandas dataframe,我们可以使用 Dask dataframecompute() 方法

[5]:
pdf = ddf.compute()
print(type(pdf))
pdf.head(2)
<class 'pandas.core.frame.DataFrame'>
[5]:
id name x y
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930

在使用 shape 属性时,我们也可以看到dask 的惰性

[6]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')
Pandas shape: (2592000, 4)
---------------------------
Dask lazy shape: (Delayed('int-cfb1b2b5-09dd-494e-b2d2-f875ace2562d'), 4)

在访问所有分区之前,我们无法获得完整的 shape - 运行 len 将会这样做

[7]:
print(f'Dask computed shape: {len(ddf.index):,}')  # expensive
Dask computed shape: 2,592,000

Pandas 创建 Dask dataframe

为了在现有的 Pandas dataframe (pdf) 上利用 Dask 的能力,我们需要使用 from_pandas 方法将 Pandas dataframe 转换为 Dask dataframe (ddf)。您必须提供用于生成 dask dataframe 的分区数或 chunksize

[8]:
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2
[8]:
Dask DataFrame 结构
id name x y
npartitions=10
2000-01-01 00:00:00 int64 object float64 float64
2000-01-04 00:00:00 ... ... ... ...
... ... ... ... ...
2000-01-28 00:00:00 ... ... ... ...
2000-01-30 23:59:59 ... ... ... ...
Dask 名称:from_pandas, 10 tasks

Dask Dataframes 中的分区

注意,当我们创建 Dask dataframe 时,需要提供 npartitions 参数。
分区数将帮助 Dask 如何分解 Pandas Datafram 并并行化计算。
每个分区都是一个独立的 dataframe。有关更多信息,请参阅分区文档

检查 reset_ index() 方法时可以看到这方面的一个示例

[9]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]
[9]:
timestamp    2000-01-01 00:00:00
id                           983
name                       Wendy
x                      -0.303374
y                      -0.423744
Name: 0, dtype: object
[10]:
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute()
[10]:
timestamp id name x y
0 2000-01-01 983 Wendy -0.303374 -0.423744
0 2000-01-04 1002 Kevin -0.825578 -0.584699
0 2000-01-07 963 Oliver 0.024036 -0.692546
0 2000-01-10 1023 Yvonne 0.897486 0.958034
0 2000-01-13 1088 Quinn -0.721954 0.261693
0 2000-01-16 994 George 0.463023 -0.166976
0 2000-01-19 932 Frank 0.272315 -0.585240
0 2000-01-22 1007 Ursula -0.919138 -0.173157
0 2000-01-25 983 Patricia -0.893431 -0.892484
0 2000-01-28 1043 Oliver -0.979336 -0.581927

Dask Dataframe 与 Pandas Dataframe

现在我们有了 dask (ddf) 和 pandas (pdf) dataframe,我们可以开始比较它们之间的交互。

概念转变 - 从更新到插入/删除

Dask 不进行更新 - 因此没有像 Pandas 中存在的 inplace=True 这样的参数。
更多详情请参阅github 上的 issue#653

重命名列

  • 使用 inplace=True 不被认为是最佳实践

[11]:
# Pandas
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[11]:
Index(['ID', 'name', 'x', 'y'], dtype='object')
# Dask - 错误 # ddf.rename(columns={'id':'ID'}, inplace=True) # ddf.columns ''' python --------------------------------------------------------------------------- TypeError Traceback (most recent call last)1 # Dask - 错误 ----> 2 ddf.rename(columns={'id':'ID'}, inplace=True) 3 ddf.columns TypeError: rename() got an unexpected keyword argument 'inplace' '''
[12]:
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[12]:
Index(['ID', 'name', 'x', 'y'], dtype='object')

数据操作

操作数据时存在一些差异。

loc - Pandas

[13]:
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)
[13]:
ID name x y
timestamp
2000-01-01 00:00:06 1019 Xavier 0.634802 45.051214
2000-01-01 00:00:08 1013 Charlie 0.627523 -8.101142

错误

cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [14], in <cell line: 2>()
      1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
      3 ddf[cond_ddf].head(2)

TypeError: '_LocIndexer' object does not support item assignment

Dask - 使用 mask/where

[14]:
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)
[14]:
ID name x y
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930
[15]:
#Dask
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)
[15]:
ID name x y
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930

更多信息请参阅dask mask 文档

Meta 参数

Dask 的一个关键特性是引入了 meta 参数。
> meta 规定了计算输出的名称/类型
由于 Dask 为计算创建了一个 DAG,它需要了解每个计算阶段的输出是什么。
有关更多信息,请参阅meta 文档
[16]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)
[16]:
ID name x y initials
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je
[17]:
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:3946: UserWarning:
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('name', 'object'))

  warnings.warn(meta_warning(meta))
[17]:
ID name x y initials
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je
[18]:
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')
[19]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)
ddf.head(2)
[19]:
ID name x y initials
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je
[20]:
# similar when using a function
def func(row):
    if (row['x']> 0):
        return row['x'] * 1000
    else:
        return row['y'] * -1
[21]:
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)
[21]:
ID name x y initials z
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We 0.423744
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je 21.914646

Map partitions

  • 我们可以使用 map_partitions 方法提供一个临时函数来在每个分区上运行。主要用于 DaskPandas 中未实现的函数。

  • 最后,我们可以返回一个新的 dataframe,需要在 meta 参数中对其进行描述。该函数也可以包含参数。

[22]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
    df['dist'] =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2
                           +  (df[coor_y] - df[coor_y].shift())**2 )
    return df.drop(drop_cols, axis=1)

ddf2 = ddf.map_partitions(func2
                          , coor_x='x'
                          , coor_y='y'
                          , drop_cols=['initials', 'z']
                          , meta=pd.DataFrame({'ID':'i8'
                                              , 'name':str
                                              , 'x':'f8'
                                              , 'y':'f8'
                                              , 'dist':'f8'}, index=[0]))
ddf2.head()
[22]:
ID name x y dist
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 NaN
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 1.063636
2000-01-01 00:00:02 996 Kevin 0.336184 0.150478 0.539449
2000-01-01 00:00:03 1035 Quinn 0.853655 0.031222 0.531035
2000-01-01 00:00:04 1039 Ingrid 0.890711 -0.992794 1.024686

将索引转换为时间列

[23]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)
[23]:
ID name x y initials times
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We 00:00:00
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je 00:00:01
[24]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or  ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)
[24]:
ID name x y initials z times
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We 0.423744 00:00:00
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je 21.914646 00:00:01

在列上删除 NA

[25]:
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1)
[26]:
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)
[27]:
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)
[27]:
ID name x y times
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 00:00:00
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 00:00:01

为了让 Dask 删除一个包含所有 na 的列,它必须使用 compute() 检查所有分区

[28]:
if ddf.colna.isnull().all().compute() == True:   # check if all values in column are Null -  expensive
    ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)
[28]:
ID name x y times
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 00:00:00
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 00:00:01

1.4 重置索引

[29]:
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)
[29]:
ID name x y times
0 983 Wendy -0.303374 -0.423744 00:00:00
1 964 Jerry 0.021915 0.588930 00:00:01
[30]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)
[30]:
ID name x y times
0 983 Wendy -0.303374 -0.423744 00:00:00
1 964 Jerry 0.021915 0.588930 00:00:01

读取 / 保存文件

  • 使用 pandasdask 时,首选使用 parquet 格式

  • 使用 Dask 时 - 可以使用多个工作节点读取文件。

  • 大多数 kwargs 适用于读写文件,例如 ddf = dd.read_csv(’data/pd2dd/ddf*.csv’, compression=‘gzip’, header=False)。

  • 但是,有些参数不可用,例如 nrows

请参阅文档(包括输出文件命名选项)。

保存文件

[31]:
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)
[32]:
%%time
# Pandas
pdf.to_csv(output_dir_file)
CPU times: user 16.4 s, sys: 433 ms, total: 16.8 s
Wall time: 16.6 s
[33]:
list(output_dir_file.parent.glob('*.csv'))
[33]:
[PosixPath('data/2000-01-25.csv'),
 PosixPath('data/2000-01-20.csv'),
 PosixPath('data/2000-01-29.csv'),
 PosixPath('data/2000-01-02.csv'),
 PosixPath('data/2000-01-19.csv'),
 PosixPath('data/2000-01-23.csv'),
 PosixPath('data/2000-01-10.csv'),
 PosixPath('data/2000-01-21.csv'),
 PosixPath('data/2000-01-17.csv'),
 PosixPath('data/2000-01-04.csv'),
 PosixPath('data/2000-01-27.csv'),
 PosixPath('data/2000-01-22.csv'),
 PosixPath('data/2000-01-14.csv'),
 PosixPath('data/2000-01-11.csv'),
 PosixPath('data/pdf_single_file.csv'),
 PosixPath('data/2000-01-13.csv'),
 PosixPath('data/2000-01-08.csv'),
 PosixPath('data/2000-01-09.csv'),
 PosixPath('data/2000-01-06.csv'),
 PosixPath('data/2000-01-01.csv'),
 PosixPath('data/2000-01-07.csv'),
 PosixPath('data/2000-01-12.csv'),
 PosixPath('data/2000-01-16.csv'),
 PosixPath('data/2000-01-26.csv'),
 PosixPath('data/2000-01-24.csv'),
 PosixPath('data/2000-01-18.csv'),
 PosixPath('data/2000-01-15.csv'),
 PosixPath('data/2000-01-03.csv'),
 PosixPath('data/2000-01-30.csv'),
 PosixPath('data/2000-01-28.csv'),
 PosixPath('data/2000-01-05.csv')]

注意 '*' 允许重命名多个文件。

[34]:
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)
[35]:
%%time
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)
CPU times: user 454 ms, sys: 46.5 ms, total: 500 ms
Wall time: 10.4 s
[35]:
['/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf00.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf01.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf02.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf03.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf04.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf05.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf06.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf07.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf08.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf09.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf10.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf11.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf12.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf13.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf14.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf15.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf16.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf17.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf18.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf19.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf20.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf21.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf22.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf23.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf24.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf25.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf26.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf27.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf28.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf29.csv']

要查找将确定输出文件数量的分区数,请使用 dask.dataframe.npartitions

要更改输出文件数,请使用 repartition,这是一项昂贵的操作。

[36]:
ddf.npartitions
[36]:
30

读取多个文件

对于 pandas,可以迭代并连接文件,请参阅 stack overflow 回答

[37]:
%%time
# Pandas
concat_df = pd.concat([pd.read_csv(f)
                       for f in list(output_dask_dir.iterdir())])
len(concat_df)
CPU times: user 2.75 s, sys: 318 ms, total: 3.07 s
Wall time: 3 s
[37]:
2592000
[38]:
%%time
# Dask
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
_ddf
CPU times: user 11.9 ms, sys: 0 ns, total: 11.9 ms
Wall time: 12.4 ms
[38]:
Dask DataFrame 结构
ID name x y times
npartitions=30
int64 object float64 float64 object
... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ...
... ... ... ... ...
Dask 名称:read-csv, 30 tasks

请记住 Dask 是惰性的 - 因此它不会真正读取文件,直到需要时才读取…

[39]:
%%time
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
len(_ddf)
CPU times: user 69.6 ms, sys: 11.3 ms, total: 81 ms
Wall time: 818 ms
[39]:
2592000
## 考虑使用 client.persist() 由于 Dask 是惰性的 - 即使它已经在之前的单元格中运行了部分计算,它也可能再次运行整个图/DAG。因此,请使用 persist 将结果保存在内存中
更多信息可以参阅此stackoverflow 问题,或在此帖子中查看示例
当在脚本中(而不是 jupyter notebook 中)运行包含循环的代码时,也应该使用这个概念。
[40]:
# e.g.
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)
[40]:
ID name x y times
0 983 Wendy -0.303374 -0.423744 00:00:00
1 964 Jerry 0.021915 0.588930 00:00:01

Group By - 自定义聚合

除了仓库中的groupby notebook 示例外 -
这是另一个示例,说明如何尝试避免使用 groupby.apply
在此示例中,我们将列分组为唯一列表。
[41]:
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()
[41]:
name ID seconds
0 Wendy 983 00
1 Jerry 964 00
2 Kevin 996 00
3 Quinn 1035 00
4 Ingrid 1039 00
[42]:
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
               (lambda x: list(set(x.to_list())))
               for att_col_gr in gp_col]
[43]:
%%time
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
print(df_edge_att.head(2))
       Weight                                                 ID  \
name
Alice   99833  [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...
Bob     99508  [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...

                                                 seconds
name
Alice  [32, 05, 12, 42, 69, 34, 23, 24, 60, 72, 98, 6...
Bob    [32, 05, 12, 42, 69, 34, 23, 24, 60, 72, 98, 6...
CPU times: user 23.1 ms, sys: 169 µs, total: 23.3 ms
Wall time: 22.1 ms
  • 请记住,在某些情况下,Pandas 更高效(假设您可以将所有数据加载到 RAM 中)。

[44]:
def set_list_att(x: dd.Series):
        return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)
[44]:
name ID seconds
0 Wendy 983 00
1 Jerry 964 01
[45]:
ddf.columns
[45]:
Index(['ID', 'name', 'x', 'y', 'times', 'seconds'], dtype='object')
[46]:
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
                ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
               for att_col_gr in gp_col]
[47]:
%%time
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
    df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File <timed exec>:4, in <module>

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1213, in _Frame.head(self, n, npartitions, compute)
   1211 # No need to warn if we're already looking at all partitions
   1212 safe = npartitions != self.npartitions
-> 1213 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1247, in _Frame._head(self, n, npartitions, compute, safe)
   1242 result = new_dd_object(
   1243     graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
   1244 )
   1246 if compute:
-> 1247     result = result.compute()
   1248 return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292, in DaskMethodsMixin.compute(self, **kwargs)
    268 def compute(self, **kwargs):
    269     """Compute this dask collection
    270
    271     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    290     dask.base.compute
    291     """
--> 292     (result,) = compute(self, traverse=False, **kwargs)
    293     return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    572     keys.append(x.__dask_keys__())
    573     postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
    576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3004, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3002         should_rejoin = False
   3003 try:
-> 3004     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3005 finally:
   3006     for f in futures.values():

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2178, in Client.gather(self, futures, errors, direct, asynchronous)
   2176 else:
   2177     local_worker = None
-> 2178 return self.sync(
   2179     self._gather,
   2180     futures,
   2181     errors=errors,
   2182     direct=direct,
   2183     local_worker=local_worker,
   2184     asynchronous=asynchronous,
   2185 )

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:318, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    316     return future
    317 else:
--> 318     return sync(
    319         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    320     )

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:385, in sync(loop, func, callback_timeout, *args, **kwargs)
    383 if error:
    384     typ, exc, tb = error
--> 385     raise exc.with_traceback(tb)
    386 else:
    387     return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:358, in sync.<locals>.f()
    356         future = asyncio.wait_for(future, callback_timeout)
    357     future = asyncio.ensure_future(future)
--> 358     result = yield future
    359 except Exception:
    360     error = sys.exc_info()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2041, in Client._gather(self, futures, errors, direct, local_worker)
   2039         exc = CancelledError(key)
   2040     else:
-> 2041         raise exception.with_traceback(traceback)
   2042     raise exc
   2043 if errors == "skip":

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990, in __call__()
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149, in get()
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in _execute_task()
     83 """Do the actual work of collecting data and executing a function
     84
     85 Examples
   (...)
    110 'foo'
    111 """
    112 if isinstance(arg, list):
--> 113     return [_execute_task(a, cache) for a in arg]
    114 elif istask(arg):
    115     func, args = arg[0], arg[1:]

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in <listcomp>()
     83 """Do the actual work of collecting data and executing a function
     84
     85 Examples
   (...)
    110 'foo'
    111 """
    112 if isinstance(arg, list):
--> 113     return [_execute_task(a, cache) for a in arg]
    114 elif istask(arg):
    115     func, args = arg[0], arg[1:]

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:40, in apply()
     38 def apply(func, args, kwargs=None):
     39     if kwargs:
---> 40         return func(*args, **kwargs)
     41     else:
     42         return func(*args)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6436, in apply_and_enforce()
   6434     return meta
   6435 if is_dataframe_like(df):
-> 6436     check_matching_columns(meta, df)
   6437     c = meta.columns
   6438 else:

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415, in check_matching_columns()
    413 else:
    414     extra_info = "Order of columns does not match"
--> 415 raise ValueError(
    416     "The columns in the computed data do not match"
    417     " the columns in the provided metadata\n"
    418     f"{extra_info}"
    419 )

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   ['name']
  Missing: [0]
我们可以做得更好…
使用 dask custom aggregation 会好得多
[48]:
import itertools
custom_agg = dd.Aggregation(
    'custom_agg',
    lambda s: s.apply(set),
    lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)
[49]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
CPU times: user 185 ms, sys: 11.3 ms, total: 196 ms
Wall time: 1.33 s
[49]:
Weight ID seconds
name
Alice 99833 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... [21, 28, 03, 20, 52, 02, 43, 38, 32, 49, 09, 0...
Bob 99508 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... [28, 21, 03, 20, 02, 52, 43, 32, 38, 49, 09, 0...

调试

调试可能具有挑战性… 1. 在没有客户端的情况下运行代码 2. 使用仪表盘分析器 3. 验证 DAG 的完整性

损坏的 DAG

在此示例中,我们展示了 DAG 损坏后可能需要重置计算。

[50]:
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)
[50]:
id name x y
timestamp
2000-01-01 996 Ingrid -0.932092 0.477965
[51]:
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())^2    # `^` <-- wrong syntax
                     +  (df[coor_y] - df[coor_y].shift())^2 )  # `^` <-- wrong syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
  warnings.warn(

一切正常吗?

# 导致错误 ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last)1 # 由于 ^2 而返回错误(需要是 **2) ----> 2 ddf.head() c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute) 898 899 if compute: --> 900 result = result.compute() 901 return result 902 c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() TypeError: unsupported operand type(s) for ^: 'float' and 'bool'
  • 即使函数被纠正,DAG 也已损坏

[52]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  # `**` <-- correct syntax
                     +  (df[coor_y] - df[coor_y].shift())**2 )  # `**` <-- correct syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
  warnings.warn(
# 仍然导致错误 ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last)1 # 由于 ^2 而返回错误(需要是 **2) ----> 2 ddf.head() c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute) 898 899 if compute: --> 900 result = result.compute() 901 return result 902 c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() TypeError: unsupported operand type(s) for ^: 'float' and 'bool'

我们需要重置 dataframe

[53]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2    #corrected math function
                     +  (df[coor_y] - df[coor_y].shift())**2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
  warnings.warn(
[53]:
id name x y col
timestamp
2000-01-01 00:00:00 979 Frank 0.820777 0.098616 NaN
2000-01-01 00:00:01 990 Laura -0.851323 -0.501678 1.77659