DataFrames:读取杂乱数据
目录
实时 Notebook
您可以在实时会话中运行此 Notebook ,或在 Github 上查看。
DataFrames:读取杂乱数据¶
在01-data-access示例中,我们展示了 Dask Dataframes 如何以与 Pandas Dataframes 相同的多种格式读取和存储数据。使用 Dask Dataframes 时的一个关键区别是,我们通常不是使用像 pandas.read_csv 这样的函数打开单个文件,而是使用 dask.dataframe.read_csv 一次打开多个文件。这使我们能够将文件集合视为单个数据集。大多数时候,这都非常有效。但是真实世界的数据通常很杂乱,在本 Notebook 中,我们将探讨一种更高级的技术,用于将杂乱的数据集导入到 Dask DataFrame 中。
启动 Dask 客户端以查看仪表盘¶
启动 Dask 客户端是可选的。它将提供一个仪表盘,这有助于深入了解计算过程。
当您在下方创建客户端时,仪表盘的链接将可见。我们建议您在一侧屏幕打开仪表盘,同时在另一侧屏幕使用 Notebook。这可能需要一些精力来安排您的窗口,但在学习时同时查看两者非常有用。
[1]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client
[1]:
客户端
Client-3a3f7d3e-0de1-11ed-a27f-000d3a8f7959
连接方法: Cluster object | 集群类型: distributed.LocalCluster |
仪表盘: http://127.0.0.1:8787/status |
集群信息
LocalCluster
df920051
仪表盘: http://127.0.0.1:8787/status | 工作节点 1 |
总线程数 4 | 总内存: 1.86 GiB |
状态: 运行中 | 使用进程: True |
调度器信息
调度器
Scheduler-0c7591d1-fd03-435a-bfcf-c75404389145
通讯: tcp://127.0.0.1:43981 | 工作节点 1 |
仪表盘: http://127.0.0.1:8787/status | 总线程数 4 |
启动时间: 刚刚 | 总内存: 1.86 GiB |
工作节点
工作节点:0
通讯: tcp://127.0.0.1:45589 | 总线程数 4 |
仪表盘: http://127.0.0.1:37755/status | 内存: 1.86 GiB |
Nanny: tcp://127.0.0.1:44473 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-pz_o1b0r |
创建人工数据集¶
首先,我们创建一个人工数据集并将其写入多个 CSV 文件。
您无需理解本节内容,我们只是为 Notebook 的其余部分创建一个数据集。
[2]:
import dask
df = dask.datasets.timeseries()
df
[2]:
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | int64 | object | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
[3]:
import os
import datetime
if not os.path.exists('data'):
os.mkdir('data')
def name(i):
""" Provide date for filename given index
Examples
--------
>>> name(0)
'2000-01-01'
>>> name(10)
'2000-01-11'
"""
return str(datetime.date(2000, 1, 1) + i * datetime.timedelta(days=1))
df.to_csv('data/*.csv', name_function=name, index=False);
读取 CSV 文件¶
现在我们的数据目录中有很多 CSV 文件,2000 年 1 月份的每一天都有一个文件。每个 CSV 文件都包含当天的时间序列数据。我们可以使用带有 glob 字符串的 dd.read_csv
函数将它们全部读取为一个逻辑数据帧。
[4]:
!ls data/*.csv | head
data/2000-01-01.csv
data/2000-01-02.csv
data/2000-01-03.csv
data/2000-01-04.csv
data/2000-01-05.csv
data/2000-01-06.csv
data/2000-01-07.csv
data/2000-01-08.csv
data/2000-01-09.csv
data/2000-01-10.csv
[5]:
import dask.dataframe as dd
df = dd.read_csv('data/2000-*-*.csv')
df
[5]:
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
int64 | object | float64 | float64 | |
... | ... | ... | ... | |
... | ... | ... | ... | ... |
... | ... | ... | ... | |
... | ... | ... | ... |
[6]:
df.head()
[6]:
id | name | x | y | |
---|---|---|---|---|
0 | 988 | Norbert | -0.742721 | -0.277954 |
1 | 1025 | Bob | 0.603313 | -0.161292 |
2 | 992 | Alice | -0.049408 | 0.573142 |
3 | 1029 | Bob | -0.122566 | 0.533852 |
4 | 1032 | Patricia | 0.476066 | -0.006417 |
让我们看看数据的一些统计信息
[7]:
df.describe().compute()
[7]:
id | x | y | |
---|---|---|---|
计数 | 2.592000e+06 | 2.592000e+06 | 2.592000e+06 |
均值 | 9.999909e+02 | -1.752288e-04 | 1.272128e-04 |
标准差 | 3.163993e+01 | 5.772766e-01 | 5.773655e-01 |
最小值 | 8.370000e+02 | -9.999999e-01 | -9.999995e-01 |
25% | 9.790000e+02 | -4.924166e-01 | -4.938494e-01 |
50% | 1.000000e+03 | 9.977439e-03 | 4.362202e-03 |
75% | 1.022000e+03 | 5.070134e-01 | 5.083363e-01 |
最大值 | 1.160000e+03 | 9.999979e-01 | 9.999995e-01 |
制造一些杂乱数据¶
现在这工作得很好,在大多数情况下,dd.read_csv
或 dd.read_parquet
等是读取大量数据文件到 Dask DataFrame 的首选方法,但真实世界的数据通常非常杂乱,有些文件可能损坏或格式错误。为了模拟这一点,我们将通过修改示例 CSV 文件来创建一些假的杂乱数据。对于文件 data/2000-01-05.csv
,我们将用空数据替换它;对于文件 data/2000-01-07.csv
,我们将删除 y
列。
[8]:
# corrupt the data in data/2000-01-05.csv
with open('data/2000-01-05.csv', 'w') as f:
f.write('')
[9]:
# remove y column from data/2000-01-07.csv
import pandas as pd
df = pd.read_csv('data/2000-01-07.csv')
del df['y']
df.to_csv('data/2000-01-07.csv', index=False)
[10]:
!head data/2000-01-05.csv
[11]:
!head data/2000-01-07.csv
id,name,x
1032,Edith,0.341158963292153
1025,Yvonne,-0.0596561961788608
996,Hannah,-0.4598038238105364
1015,Norbert,-0.6893967021653444
976,Hannah,0.4339578272105588
1002,Dan,0.3519233500902228
917,Xavier,-0.928241343897473
1036,Hannah,-0.5115504865546654
972,Oliver,-0.3808144336718926
读取杂乱数据¶
让我们再次尝试读取文件集合
[12]:
df = dd.read_csv('data/2000-*-*.csv')
[13]:
df.head()
[13]:
id | name | x | y | |
---|---|---|---|---|
0 | 988 | Norbert | -0.742721 | -0.277954 |
1 | 1025 | Bob | 0.603313 | -0.161292 |
2 | 992 | Alice | -0.049408 | 0.573142 |
3 | 1029 | Bob | -0.122566 | 0.533852 |
4 | 1032 | Patricia | 0.476066 | -0.006417 |
好的,这看起来成功了,让我们再次计算数据集统计信息
[14]:
df.describe().compute()
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Input In [14], in <cell line: 1>()
----> 1 df.describe().compute()
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292, in DaskMethodsMixin.compute(self, **kwargs)
268 def compute(self, **kwargs):
269 """Compute this dask collection
270
271 This turns a lazy Dask collection into its in-memory equivalent.
(...)
290 dask.base.compute
291 """
--> 292 (result,) = compute(self, traverse=False, **kwargs)
293 return result
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
572 keys.append(x.__dask_keys__())
573 postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3004, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3002 should_rejoin = False
3003 try:
-> 3004 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3005 finally:
3006 for f in futures.values():
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2178, in Client.gather(self, futures, errors, direct, asynchronous)
2176 else:
2177 local_worker = None
-> 2178 return self.sync(
2179 self._gather,
2180 futures,
2181 errors=errors,
2182 direct=direct,
2183 local_worker=local_worker,
2184 asynchronous=asynchronous,
2185 )
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:318, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
316 return future
317 else:
--> 318 return sync(
319 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
320 )
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:385, in sync(loop, func, callback_timeout, *args, **kwargs)
383 if error:
384 typ, exc, tb = error
--> 385 raise exc.with_traceback(tb)
386 else:
387 return result
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:358, in sync.<locals>.f()
356 future = asyncio.wait_for(future, callback_timeout)
357 future = asyncio.ensure_future(future)
--> 358 result = yield future
359 except Exception:
360 error = sys.exc_info()
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
759 exc_info = None
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2041, in Client._gather(self, futures, errors, direct, local_worker)
2039 exc = CancelledError(key)
2040 else:
-> 2041 raise exception.with_traceback(traceback)
2042 raise exc
2043 if errors == "skip":
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990, in __call__()
988 if not len(args) == len(self.inkeys):
989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149, in get()
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:40, in apply()
38 def apply(func, args, kwargs=None):
39 if kwargs:
---> 40 return func(*args, **kwargs)
41 else:
42 return func(*args)
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6436, in apply_and_enforce()
6434 return meta
6435 if is_dataframe_like(df):
-> 6436 check_matching_columns(meta, df)
6437 c = meta.columns
6438 else:
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415, in check_matching_columns()
413 else:
414 extra_info = "Order of columns does not match"
--> 415 raise ValueError(
416 "The columns in the computed data do not match"
417 " the columns in the provided metadata\n"
418 f"{extra_info}"
419 )
ValueError: The columns in the computed data do not match the columns in the provided metadata
Extra: []
Missing: ['y']
那么发生了什么?
当从文件集合创建 Dask DataFrame 时,dd.read_csv
会采样数据集中的前几个文件来确定可用的数据类型和列。由于它尚未打开所有文件,因此不知道其中一些是否已损坏。因此,df.head()
可以工作,因为它只查看第一个文件。df.describe.compute()
失败是因为文件 data/2000-01-05.csv
中的数据损坏。
构建延迟读取器¶
为了解决这个问题,我们将使用一种更高级的技术来构建我们的 Dask DataFrame。这种方法也可以在读取每个文件时需要一些自定义逻辑的任何时候使用。本质上,我们将构建一个使用 pandas 并进行一些错误检查的函数,并返回一个 pandas DataFrame。如果我们发现一个损坏的数据文件,我们要么找到一种方法来修复/清理数据,要么返回一个结构与正常数据相同的空 pandas DataFrame。
[15]:
import numpy as np
import io
def read_data(filename):
# for this to work we need to explicitly set the datatypes of our pandas dataframe
dtypes = {'id': int, 'name': str, 'x': float, 'y': float}
try:
# try reading in the data with pandas
df = pd.read_csv(filename, dtype=dtypes)
except:
# if this fails create an empty pandas dataframe with the same dtypes as the good data
df = pd.read_csv(io.StringIO(''), names=dtypes.keys(), dtype=dtypes)
# for the case with the missing column, add a column of data with NaN's
if 'y' not in df.columns:
df['y'] = np.NaN
return df
让我们在一个正常文件和两个损坏文件上测试这个函数
[16]:
# test function on a normal file
read_data('data/2000-01-01.csv').head()
[16]:
id | name | x | y | |
---|---|---|---|---|
0 | 988 | Norbert | -0.742721 | -0.277954 |
1 | 1025 | Bob | 0.603313 | -0.161292 |
2 | 992 | Alice | -0.049408 | 0.573142 |
3 | 1029 | Bob | -0.122566 | 0.533852 |
4 | 1032 | Patricia | 0.476066 | -0.006417 |
[17]:
# test function on the empty file
read_data('data/2000-01-05.csv').head()
[17]:
id | name | x | y |
---|
[18]:
# test function on the file missing the y column
read_data('data/2000-01-07.csv').head()
[18]:
id | name | x | y | |
---|---|---|---|---|
0 | 1032 | Edith | 0.341159 | NaN |
1 | 1025 | Yvonne | -0.059656 | NaN |
2 | 996 | Hannah | -0.459804 | NaN |
3 | 1015 | Norbert | -0.689397 | NaN |
4 | 976 | Hannah | 0.433958 | NaN |
组装 Dask 数据帧¶
首先,我们将 read_data
函数转换为 Dask delayed 函数
[19]:
from dask import delayed
read_data = delayed(read_data)
让我们看看这个函数现在做了什么
[20]:
df = read_data('data/2000-01-01.csv')
df
[20]:
Delayed('read_data-604fd047-a660-4c67-87ad-60569554e79e')
它创建了一个 delayed 对象,要实际运行读取文件,我们需要运行 .compute()
[21]:
df.compute()
[21]:
id | name | x | y | |
---|---|---|---|---|
0 | 988 | Norbert | -0.742721 | -0.277954 |
1 | 1025 | Bob | 0.603313 | -0.161292 |
2 | 992 | Alice | -0.049408 | 0.573142 |
3 | 1029 | Bob | -0.122566 | 0.533852 |
4 | 1032 | Patricia | 0.476066 | -0.006417 |
... | ... | ... | ... | ... |
86395 | 927 | Alice | 0.051035 | 0.051330 |
86396 | 968 | George | -0.389181 | 0.096867 |
86397 | 1039 | Alice | 0.396751 | 0.688604 |
86398 | 996 | Patricia | -0.042164 | -0.924152 |
86399 | 956 | Tim | 0.854212 | 0.858070 |
86400 行 × 4 列
现在让我们构建所有可用 CSV 文件的列表
[22]:
# loop over all the files
from glob import glob
files = glob('data/2000-*-*.csv')
files
[22]:
['data/2000-01-25.csv',
'data/2000-01-20.csv',
'data/2000-01-29.csv',
'data/2000-01-02.csv',
'data/2000-01-19.csv',
'data/2000-01-23.csv',
'data/2000-01-10.csv',
'data/2000-01-21.csv',
'data/2000-01-17.csv',
'data/2000-01-04.csv',
'data/2000-01-27.csv',
'data/2000-01-22.csv',
'data/2000-01-14.csv',
'data/2000-01-11.csv',
'data/2000-01-13.csv',
'data/2000-01-08.csv',
'data/2000-01-09.csv',
'data/2000-01-06.csv',
'data/2000-01-01.csv',
'data/2000-01-07.csv',
'data/2000-01-12.csv',
'data/2000-01-16.csv',
'data/2000-01-26.csv',
'data/2000-01-24.csv',
'data/2000-01-18.csv',
'data/2000-01-15.csv',
'data/2000-01-03.csv',
'data/2000-01-30.csv',
'data/2000-01-28.csv',
'data/2000-01-05.csv']
现在我们对列表中的每个文件运行 delayed read_data
函数
[23]:
df = [read_data(file) for file in files]
df
[23]:
[Delayed('read_data-6958e08a-f47f-4da2-9ceb-cb995eab99bf'),
Delayed('read_data-30f33e21-323e-49c0-ae36-a7c6289d8ada'),
Delayed('read_data-2fdfe85e-79e1-417d-af2d-3a577fe15975'),
Delayed('read_data-72b35641-90b5-4518-bbac-fa9c9024c756'),
Delayed('read_data-e3adc855-9df0-4985-87fd-f95e3f2d10b7'),
Delayed('read_data-f070f86c-bff6-448e-abe0-50baaf9282b0'),
Delayed('read_data-f4ed9f6d-c5ae-44aa-ba0e-a2eaf2cd749a'),
Delayed('read_data-2a55c497-9a5a-4474-8dca-fc243ee5a5bf'),
Delayed('read_data-1fb346a5-4a27-4772-ab6b-94419f328ae0'),
Delayed('read_data-72610d5f-2847-4afb-9c86-af08217797d2'),
Delayed('read_data-f3bbcc5b-c2f6-4a5f-8d3e-1d50fb30dc69'),
Delayed('read_data-52b113b8-1692-4ff9-86b4-cb65e066e1c3'),
Delayed('read_data-ff401421-8ccf-4e29-bf70-8b63ed4e8b90'),
Delayed('read_data-ebe81647-e84f-4377-ba1c-26f220aed7e3'),
Delayed('read_data-dabf5c6c-e459-4f89-9a02-4b4a11879708'),
Delayed('read_data-c7b3408b-2cec-41e1-9553-fb9a24a338b0'),
Delayed('read_data-fbd802e1-f886-4035-a285-1d657e1074e5'),
Delayed('read_data-fc2fb366-2ef9-4eaf-bfee-6679420f4080'),
Delayed('read_data-9f4b137b-6dd0-491c-bf55-6cb40a502918'),
Delayed('read_data-3d109e18-3e32-495d-940b-1882b33ab6dd'),
Delayed('read_data-8915acd4-a325-48fd-b147-a0c7a238f0df'),
Delayed('read_data-ec5de8ae-f438-4b65-9214-3dab09f1e05a'),
Delayed('read_data-9b519672-8a00-4c53-a1ff-c1f960272d4c'),
Delayed('read_data-6594c4c0-d33e-4f13-8fcc-ae39a840b3f9'),
Delayed('read_data-80c1be62-beeb-4317-91ba-6363d6f8eee5'),
Delayed('read_data-cf7ac988-9874-4b62-91f8-148c60c670c0'),
Delayed('read_data-2175062e-82b5-4d46-b1c7-31d301e26ba3'),
Delayed('read_data-6a97a8fc-a3df-494e-8870-0ba7b6638444'),
Delayed('read_data-14d8926a-674d-4e17-b603-9f2da75bd25c'),
Delayed('read_data-d6be5de8-1b74-4a12-bc45-7d7f4e7bd190')]
然后我们使用 dask.dataframe.from_delayed。这个函数从 delayed 对象列表创建 Dask DataFrame,前提是每个 delayed 对象都返回一个 pandas DataFrame。返回的每个单独数据帧的结构也必须相同。
[24]:
df = dd.from_delayed(df, meta={'id': int, 'name': str, 'x': float, 'y': float})
df
[24]:
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
int64 | object | float64 | float64 | |
... | ... | ... | ... | |
... | ... | ... | ... | ... |
... | ... | ... | ... | |
... | ... | ... | ... |
注意:我们在 meta
关键字中提供了 dtypes
,以明确告诉 Dask DataFrame 期望哪种类型的数据帧。如果我们不这样做,Dask 将从第一个 delayed 对象推断,如果它是一个大型 CSV 文件,这可能会很慢。
现在让我们看看是否奏效¶
[25]:
df.head()
[25]:
id | name | x | y | |
---|---|---|---|---|
0 | 976 | Oliver | 0.628767 | 0.765093 |
1 | 1053 | Sarah | -0.047006 | -0.955109 |
2 | 1049 | Quinn | -0.032074 | -0.099608 |
3 | 1005 | Frank | -0.255920 | 0.963524 |
4 | 993 | Ursula | 0.980263 | -0.875488 |
[26]:
df.describe().compute()
[26]:
id | x | y | |
---|---|---|---|
计数 | 2.505600e+06 | 2.505600e+06 | 2.419200e+06 |
均值 | 9.999870e+02 | -1.615087e-04 | 7.238821e-05 |
标准差 | 3.163868e+01 | 5.772576e-01 | 5.774155e-01 |
最小值 | 8.370000e+02 | -9.999999e-01 | -9.999995e-01 |
25% | 9.790000e+02 | -4.924166e-01 | -4.938494e-01 |
50% | 1.000000e+03 | 9.977439e-03 | 4.362202e-03 |
75% | 1.022000e+03 | 5.070134e-01 | 5.083363e-01 |
最大值 | 1.160000e+03 | 9.999979e-01 | 9.999995e-01 |
成功!¶
总结一下,在本例中,我们探讨了一种从多个数据文件集合创建 Dask DataFrame 的方法。通常,您会使用像 dd.read_csv
或 dd.read_parquet
这样的内置函数来完成此操作。有时,由于数据集中的文件杂乱/损坏或需要进行一些自定义处理,这可能无法实现。
在这些情况下,您可以使用以下步骤构建 Dask DataFrame。
创建一个常规 Python 函数,该函数读取数据,执行任何转换、错误检查等,并且始终返回一个结构相同的 Pandas DataFrame
使用
dask.delayed
函数将此读取函数转换为 delayed 对象使用 delayed 数据读取器调用数据集中的每个文件,并将输出组装成 delayed 对象列表
使用
dd.from_delayed
将 delayed 对象列表转换为 Dask DataFrame
这种相同的技术也可以用于其他情况。另一个例子可能是需要使用专用读取器或经过多次转换才能转换为 pandas DataFrame 的数据文件。