实时 Notebook

您可以在实时会话中运行此 Notebook Binder,或在 Github 上查看

DataFrames:读取杂乱数据

01-data-access示例中,我们展示了 Dask Dataframes 如何以与 Pandas Dataframes 相同的多种格式读取和存储数据。使用 Dask Dataframes 时的一个关键区别是,我们通常不是使用像 pandas.read_csv 这样的函数打开单个文件,而是使用 dask.dataframe.read_csv 一次打开多个文件。这使我们能够将文件集合视为单个数据集。大多数时候,这都非常有效。但是真实世界的数据通常很杂乱,在本 Notebook 中,我们将探讨一种更高级的技术,用于将杂乱的数据集导入到 Dask DataFrame 中。

启动 Dask 客户端以查看仪表盘

启动 Dask 客户端是可选的。它将提供一个仪表盘,这有助于深入了解计算过程。

当您在下方创建客户端时,仪表盘的链接将可见。我们建议您在一侧屏幕打开仪表盘,同时在另一侧屏幕使用 Notebook。这可能需要一些精力来安排您的窗口,但在学习时同时查看两者非常有用。

[1]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client
[1]:

客户端

Client-3a3f7d3e-0de1-11ed-a27f-000d3a8f7959

连接方法: Cluster object 集群类型: distributed.LocalCluster
仪表盘: http://127.0.0.1:8787/status

集群信息

创建人工数据集

首先,我们创建一个人工数据集并将其写入多个 CSV 文件。

您无需理解本节内容,我们只是为 Notebook 的其余部分创建一个数据集。

[2]:
import dask
df = dask.datasets.timeseries()
df
[2]:
Dask DataFrame 结构
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask 名称:make-timeseries,30 个任务
[3]:
import os
import datetime

if not os.path.exists('data'):
    os.mkdir('data')

def name(i):
    """ Provide date for filename given index

    Examples
    --------
    >>> name(0)
    '2000-01-01'
    >>> name(10)
    '2000-01-11'
    """
    return str(datetime.date(2000, 1, 1) + i * datetime.timedelta(days=1))

df.to_csv('data/*.csv', name_function=name, index=False);

读取 CSV 文件

现在我们的数据目录中有很多 CSV 文件,2000 年 1 月份的每一天都有一个文件。每个 CSV 文件都包含当天的时间序列数据。我们可以使用带有 glob 字符串的 dd.read_csv 函数将它们全部读取为一个逻辑数据帧。

[4]:
!ls data/*.csv | head
data/2000-01-01.csv
data/2000-01-02.csv
data/2000-01-03.csv
data/2000-01-04.csv
data/2000-01-05.csv
data/2000-01-06.csv
data/2000-01-07.csv
data/2000-01-08.csv
data/2000-01-09.csv
data/2000-01-10.csv
[5]:
import dask.dataframe as dd

df = dd.read_csv('data/2000-*-*.csv')
df
[5]:
Dask DataFrame 结构
id name x y
npartitions=30
int64 object float64 float64
... ... ... ...
... ... ... ... ...
... ... ... ...
... ... ... ...
Dask 名称:read-csv,30 个任务
[6]:
df.head()
[6]:
id name x y
0 988 Norbert -0.742721 -0.277954
1 1025 Bob 0.603313 -0.161292
2 992 Alice -0.049408 0.573142
3 1029 Bob -0.122566 0.533852
4 1032 Patricia 0.476066 -0.006417

让我们看看数据的一些统计信息

[7]:
df.describe().compute()
[7]:
id x y
计数 2.592000e+06 2.592000e+06 2.592000e+06
均值 9.999909e+02 -1.752288e-04 1.272128e-04
标准差 3.163993e+01 5.772766e-01 5.773655e-01
最小值 8.370000e+02 -9.999999e-01 -9.999995e-01
25% 9.790000e+02 -4.924166e-01 -4.938494e-01
50% 1.000000e+03 9.977439e-03 4.362202e-03
75% 1.022000e+03 5.070134e-01 5.083363e-01
最大值 1.160000e+03 9.999979e-01 9.999995e-01

制造一些杂乱数据

现在这工作得很好,在大多数情况下,dd.read_csvdd.read_parquet 等是读取大量数据文件到 Dask DataFrame 的首选方法,但真实世界的数据通常非常杂乱,有些文件可能损坏或格式错误。为了模拟这一点,我们将通过修改示例 CSV 文件来创建一些假的杂乱数据。对于文件 data/2000-01-05.csv,我们将用空数据替换它;对于文件 data/2000-01-07.csv,我们将删除 y 列。

[8]:
# corrupt the data in data/2000-01-05.csv
with open('data/2000-01-05.csv', 'w') as f:
    f.write('')
[9]:
# remove y column from data/2000-01-07.csv
import pandas as pd
df = pd.read_csv('data/2000-01-07.csv')
del df['y']
df.to_csv('data/2000-01-07.csv', index=False)
[10]:
!head data/2000-01-05.csv
[11]:
!head data/2000-01-07.csv
id,name,x
1032,Edith,0.341158963292153
1025,Yvonne,-0.0596561961788608
996,Hannah,-0.4598038238105364
1015,Norbert,-0.6893967021653444
976,Hannah,0.4339578272105588
1002,Dan,0.3519233500902228
917,Xavier,-0.928241343897473
1036,Hannah,-0.5115504865546654
972,Oliver,-0.3808144336718926

读取杂乱数据

让我们再次尝试读取文件集合

[12]:
df = dd.read_csv('data/2000-*-*.csv')
[13]:
df.head()
[13]:
id name x y
0 988 Norbert -0.742721 -0.277954
1 1025 Bob 0.603313 -0.161292
2 992 Alice -0.049408 0.573142
3 1029 Bob -0.122566 0.533852
4 1032 Patricia 0.476066 -0.006417

好的,这看起来成功了,让我们再次计算数据集统计信息

[14]:
df.describe().compute()
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Input In [14], in <cell line: 1>()
----> 1 df.describe().compute()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292, in DaskMethodsMixin.compute(self, **kwargs)
    268 def compute(self, **kwargs):
    269     """Compute this dask collection
    270
    271     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    290     dask.base.compute
    291     """
--> 292     (result,) = compute(self, traverse=False, **kwargs)
    293     return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    572     keys.append(x.__dask_keys__())
    573     postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
    576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3004, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3002         should_rejoin = False
   3003 try:
-> 3004     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3005 finally:
   3006     for f in futures.values():

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2178, in Client.gather(self, futures, errors, direct, asynchronous)
   2176 else:
   2177     local_worker = None
-> 2178 return self.sync(
   2179     self._gather,
   2180     futures,
   2181     errors=errors,
   2182     direct=direct,
   2183     local_worker=local_worker,
   2184     asynchronous=asynchronous,
   2185 )

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:318, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    316     return future
    317 else:
--> 318     return sync(
    319         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    320     )

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:385, in sync(loop, func, callback_timeout, *args, **kwargs)
    383 if error:
    384     typ, exc, tb = error
--> 385     raise exc.with_traceback(tb)
    386 else:
    387     return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:358, in sync.<locals>.f()
    356         future = asyncio.wait_for(future, callback_timeout)
    357     future = asyncio.ensure_future(future)
--> 358     result = yield future
    359 except Exception:
    360     error = sys.exc_info()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2041, in Client._gather(self, futures, errors, direct, local_worker)
   2039         exc = CancelledError(key)
   2040     else:
-> 2041         raise exception.with_traceback(traceback)
   2042     raise exc
   2043 if errors == "skip":

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990, in __call__()
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149, in get()
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:40, in apply()
     38 def apply(func, args, kwargs=None):
     39     if kwargs:
---> 40         return func(*args, **kwargs)
     41     else:
     42         return func(*args)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6436, in apply_and_enforce()
   6434     return meta
   6435 if is_dataframe_like(df):
-> 6436     check_matching_columns(meta, df)
   6437     c = meta.columns
   6438 else:

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415, in check_matching_columns()
    413 else:
    414     extra_info = "Order of columns does not match"
--> 415 raise ValueError(
    416     "The columns in the computed data do not match"
    417     " the columns in the provided metadata\n"
    418     f"{extra_info}"
    419 )

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   []
  Missing: ['y']

那么发生了什么?

当从文件集合创建 Dask DataFrame 时,dd.read_csv 会采样数据集中的前几个文件来确定可用的数据类型和列。由于它尚未打开所有文件,因此不知道其中一些是否已损坏。因此,df.head() 可以工作,因为它只查看第一个文件。df.describe.compute() 失败是因为文件 data/2000-01-05.csv 中的数据损坏。

构建延迟读取器

为了解决这个问题,我们将使用一种更高级的技术来构建我们的 Dask DataFrame。这种方法也可以在读取每个文件时需要一些自定义逻辑的任何时候使用。本质上,我们将构建一个使用 pandas 并进行一些错误检查的函数,并返回一个 pandas DataFrame。如果我们发现一个损坏的数据文件,我们要么找到一种方法来修复/清理数据,要么返回一个结构与正常数据相同的空 pandas DataFrame。

[15]:
import numpy as np
import io

def read_data(filename):

    # for this to work we need to explicitly set the datatypes of our pandas dataframe
    dtypes = {'id': int, 'name': str, 'x': float, 'y': float}
    try:
        # try reading in the data with pandas
        df = pd.read_csv(filename, dtype=dtypes)
    except:
        # if this fails create an empty pandas dataframe with the same dtypes as the good data
        df = pd.read_csv(io.StringIO(''), names=dtypes.keys(), dtype=dtypes)

    # for the case with the missing column, add a column of data with NaN's
    if 'y' not in df.columns:
        df['y'] = np.NaN

    return df

让我们在一个正常文件和两个损坏文件上测试这个函数

[16]:
# test function on a normal file
read_data('data/2000-01-01.csv').head()
[16]:
id name x y
0 988 Norbert -0.742721 -0.277954
1 1025 Bob 0.603313 -0.161292
2 992 Alice -0.049408 0.573142
3 1029 Bob -0.122566 0.533852
4 1032 Patricia 0.476066 -0.006417
[17]:
# test function on the empty file
read_data('data/2000-01-05.csv').head()
[17]:
id name x y
[18]:
# test function on the file missing the y column
read_data('data/2000-01-07.csv').head()
[18]:
id name x y
0 1032 Edith 0.341159 NaN
1 1025 Yvonne -0.059656 NaN
2 996 Hannah -0.459804 NaN
3 1015 Norbert -0.689397 NaN
4 976 Hannah 0.433958 NaN

组装 Dask 数据帧

首先,我们将 read_data 函数转换为 Dask delayed 函数

[19]:
from dask import delayed
read_data = delayed(read_data)

让我们看看这个函数现在做了什么

[20]:
df = read_data('data/2000-01-01.csv')
df
[20]:
Delayed('read_data-604fd047-a660-4c67-87ad-60569554e79e')

它创建了一个 delayed 对象,要实际运行读取文件,我们需要运行 .compute()

[21]:
df.compute()
[21]:
id name x y
0 988 Norbert -0.742721 -0.277954
1 1025 Bob 0.603313 -0.161292
2 992 Alice -0.049408 0.573142
3 1029 Bob -0.122566 0.533852
4 1032 Patricia 0.476066 -0.006417
... ... ... ... ...
86395 927 Alice 0.051035 0.051330
86396 968 George -0.389181 0.096867
86397 1039 Alice 0.396751 0.688604
86398 996 Patricia -0.042164 -0.924152
86399 956 Tim 0.854212 0.858070

86400 行 × 4 列

现在让我们构建所有可用 CSV 文件的列表

[22]:
# loop over all the files
from glob import glob
files = glob('data/2000-*-*.csv')
files
[22]:
['data/2000-01-25.csv',
 'data/2000-01-20.csv',
 'data/2000-01-29.csv',
 'data/2000-01-02.csv',
 'data/2000-01-19.csv',
 'data/2000-01-23.csv',
 'data/2000-01-10.csv',
 'data/2000-01-21.csv',
 'data/2000-01-17.csv',
 'data/2000-01-04.csv',
 'data/2000-01-27.csv',
 'data/2000-01-22.csv',
 'data/2000-01-14.csv',
 'data/2000-01-11.csv',
 'data/2000-01-13.csv',
 'data/2000-01-08.csv',
 'data/2000-01-09.csv',
 'data/2000-01-06.csv',
 'data/2000-01-01.csv',
 'data/2000-01-07.csv',
 'data/2000-01-12.csv',
 'data/2000-01-16.csv',
 'data/2000-01-26.csv',
 'data/2000-01-24.csv',
 'data/2000-01-18.csv',
 'data/2000-01-15.csv',
 'data/2000-01-03.csv',
 'data/2000-01-30.csv',
 'data/2000-01-28.csv',
 'data/2000-01-05.csv']

现在我们对列表中的每个文件运行 delayed read_data 函数

[23]:
df = [read_data(file) for file in files]
df
[23]:
[Delayed('read_data-6958e08a-f47f-4da2-9ceb-cb995eab99bf'),
 Delayed('read_data-30f33e21-323e-49c0-ae36-a7c6289d8ada'),
 Delayed('read_data-2fdfe85e-79e1-417d-af2d-3a577fe15975'),
 Delayed('read_data-72b35641-90b5-4518-bbac-fa9c9024c756'),
 Delayed('read_data-e3adc855-9df0-4985-87fd-f95e3f2d10b7'),
 Delayed('read_data-f070f86c-bff6-448e-abe0-50baaf9282b0'),
 Delayed('read_data-f4ed9f6d-c5ae-44aa-ba0e-a2eaf2cd749a'),
 Delayed('read_data-2a55c497-9a5a-4474-8dca-fc243ee5a5bf'),
 Delayed('read_data-1fb346a5-4a27-4772-ab6b-94419f328ae0'),
 Delayed('read_data-72610d5f-2847-4afb-9c86-af08217797d2'),
 Delayed('read_data-f3bbcc5b-c2f6-4a5f-8d3e-1d50fb30dc69'),
 Delayed('read_data-52b113b8-1692-4ff9-86b4-cb65e066e1c3'),
 Delayed('read_data-ff401421-8ccf-4e29-bf70-8b63ed4e8b90'),
 Delayed('read_data-ebe81647-e84f-4377-ba1c-26f220aed7e3'),
 Delayed('read_data-dabf5c6c-e459-4f89-9a02-4b4a11879708'),
 Delayed('read_data-c7b3408b-2cec-41e1-9553-fb9a24a338b0'),
 Delayed('read_data-fbd802e1-f886-4035-a285-1d657e1074e5'),
 Delayed('read_data-fc2fb366-2ef9-4eaf-bfee-6679420f4080'),
 Delayed('read_data-9f4b137b-6dd0-491c-bf55-6cb40a502918'),
 Delayed('read_data-3d109e18-3e32-495d-940b-1882b33ab6dd'),
 Delayed('read_data-8915acd4-a325-48fd-b147-a0c7a238f0df'),
 Delayed('read_data-ec5de8ae-f438-4b65-9214-3dab09f1e05a'),
 Delayed('read_data-9b519672-8a00-4c53-a1ff-c1f960272d4c'),
 Delayed('read_data-6594c4c0-d33e-4f13-8fcc-ae39a840b3f9'),
 Delayed('read_data-80c1be62-beeb-4317-91ba-6363d6f8eee5'),
 Delayed('read_data-cf7ac988-9874-4b62-91f8-148c60c670c0'),
 Delayed('read_data-2175062e-82b5-4d46-b1c7-31d301e26ba3'),
 Delayed('read_data-6a97a8fc-a3df-494e-8870-0ba7b6638444'),
 Delayed('read_data-14d8926a-674d-4e17-b603-9f2da75bd25c'),
 Delayed('read_data-d6be5de8-1b74-4a12-bc45-7d7f4e7bd190')]

然后我们使用 dask.dataframe.from_delayed。这个函数从 delayed 对象列表创建 Dask DataFrame,前提是每个 delayed 对象都返回一个 pandas DataFrame。返回的每个单独数据帧的结构也必须相同。

[24]:
df = dd.from_delayed(df, meta={'id': int, 'name': str, 'x': float, 'y': float})
df
[24]:
Dask DataFrame 结构
id name x y
npartitions=30
int64 object float64 float64
... ... ... ...
... ... ... ... ...
... ... ... ...
... ... ... ...
Dask 名称:from-delayed,60 个任务

注意:我们在 meta 关键字中提供了 dtypes,以明确告诉 Dask DataFrame 期望哪种类型的数据帧。如果我们不这样做,Dask 将从第一个 delayed 对象推断,如果它是一个大型 CSV 文件,这可能会很慢。

现在让我们看看是否奏效

[25]:
df.head()
[25]:
id name x y
0 976 Oliver 0.628767 0.765093
1 1053 Sarah -0.047006 -0.955109
2 1049 Quinn -0.032074 -0.099608
3 1005 Frank -0.255920 0.963524
4 993 Ursula 0.980263 -0.875488
[26]:
df.describe().compute()
[26]:
id x y
计数 2.505600e+06 2.505600e+06 2.419200e+06
均值 9.999870e+02 -1.615087e-04 7.238821e-05
标准差 3.163868e+01 5.772576e-01 5.774155e-01
最小值 8.370000e+02 -9.999999e-01 -9.999995e-01
25% 9.790000e+02 -4.924166e-01 -4.938494e-01
50% 1.000000e+03 9.977439e-03 4.362202e-03
75% 1.022000e+03 5.070134e-01 5.083363e-01
最大值 1.160000e+03 9.999979e-01 9.999995e-01

成功!

总结一下,在本例中,我们探讨了一种从多个数据文件集合创建 Dask DataFrame 的方法。通常,您会使用像 dd.read_csvdd.read_parquet 这样的内置函数来完成此操作。有时,由于数据集中的文件杂乱/损坏或需要进行一些自定义处理,这可能无法实现。

在这些情况下,您可以使用以下步骤构建 Dask DataFrame。

  1. 创建一个常规 Python 函数,该函数读取数据,执行任何转换、错误检查等,并且始终返回一个结构相同的 Pandas DataFrame

  2. 使用 dask.delayed 函数将此读取函数转换为 delayed 对象

  3. 使用 delayed 数据读取器调用数据集中的每个文件,并将输出组装成 delayed 对象列表

  4. 使用 dd.from_delayed 将 delayed 对象列表转换为 Dask DataFrame

这种相同的技术也可以用于其他情况。另一个例子可能是需要使用专用读取器或经过多次转换才能转换为 pandas DataFrame 的数据文件。