Dask Bag
目录
在线 Notebook
您可以在 在线会话 中运行此 notebook 或在 Github 上查看。
Dask Bag¶
Dask Bag 在 Python 对象集合上实现了诸如 map
、filter
、groupby
和聚合等操作。它使用 Python 迭代器以并行且占用内存少的方式完成这些操作。它类似于 itertools 的并行版本或 PySpark RDD 的 Pythonic 版本。
Dask Bag 通常用于对日志文件、JSON 记录或其他用户定义的 Python 对象进行简单的预处理。
完整的 API 文档可在此处获取:https://docs.dask.org.cn/en/latest/bag-api.html
启动 Dask Client 以查看 Dashboard¶
启动 Dask Client 是可选的。它将提供一个 Dashboard,这对于深入了解计算过程很有用。
当您在下方创建 Client 后,将显示 Dashboard 的链接。我们建议您在一侧屏幕上打开 Dashboard,同时在另一侧使用 notebook。这可能需要一些努力来安排您的窗口,但在学习时同时看到它们非常有用。
[1]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client
[1]:
Client
Client-db1fb37f-0ddf-11ed-9823-000d3a8f7959
连接方法: Cluster 对象 | Cluster 类型: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster 信息
LocalCluster
fce50585
Dashboard: http://127.0.0.1:8787/status | Worker 4 |
总线程数 4 | 总内存: 6.78 GiB |
状态: 运行中 | 使用进程: True |
Scheduler 信息
Scheduler
Scheduler-6db95d18-fecb-4e65-8bb9-c1ecfb644d25
通信: tcp://127.0.0.1:37071 | Worker 4 |
Dashboard: http://127.0.0.1:8787/status | 总线程数 4 |
启动时间: 刚刚 | 总内存: 6.78 GiB |
Worker
Worker: 0
通信: tcp://127.0.0.1:38523 | 总线程数 1 |
Dashboard: http://127.0.0.1:33803/status | 内存: 1.70 GiB |
Nanny: tcp://127.0.0.1:46261 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-z2ivdlv5 |
Worker: 1
通信: tcp://127.0.0.1:38005 | 总线程数 1 |
Dashboard: http://127.0.0.1:43833/status | 内存: 1.70 GiB |
Nanny: tcp://127.0.0.1:36091 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-dknui8s8 |
Worker: 2
通信: tcp://127.0.0.1:36089 | 总线程数 1 |
Dashboard: http://127.0.0.1:46853/status | 内存: 1.70 GiB |
Nanny: tcp://127.0.0.1:45553 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-ei436jch |
Worker: 3
通信: tcp://127.0.0.1:34709 | 总线程数 1 |
Dashboard: http://127.0.0.1:35105/status | 内存: 1.70 GiB |
Nanny: tcp://127.0.0.1:40747 | |
本地目录: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-uvqwrfkf |
创建随机数据¶
我们创建一组随机记录数据,并将其存储到磁盘上的多个 JSON 文件中。这些数据将作为本 notebook 的示例数据。
[2]:
import dask
import json
import os
os.makedirs('data', exist_ok=True) # Create data/ directory
b = dask.datasets.make_people() # Make records of people
b.map(json.dumps).to_textfiles('data/*.json') # Encode as JSON, write to disk
[2]:
['/home/runner/work/dask-examples/dask-examples/data/0.json',
'/home/runner/work/dask-examples/dask-examples/data/1.json',
'/home/runner/work/dask-examples/dask-examples/data/2.json',
'/home/runner/work/dask-examples/dask-examples/data/3.json',
'/home/runner/work/dask-examples/dask-examples/data/4.json',
'/home/runner/work/dask-examples/dask-examples/data/5.json',
'/home/runner/work/dask-examples/dask-examples/data/6.json',
'/home/runner/work/dask-examples/dask-examples/data/7.json',
'/home/runner/work/dask-examples/dask-examples/data/8.json',
'/home/runner/work/dask-examples/dask-examples/data/9.json']
读取 JSON 数据¶
既然我们有了文件中的 JSON 数据,现在就使用 Dask Bag 和 Python 的 JSON 模块来看看它。
[3]:
!head -n 2 data/0.json
{"age": 61, "name": ["Emiko", "Oliver"], "occupation": "Medical Student", "telephone": "166.814.5565", "address": {"address": "645 Drumm Line", "city": "Kennewick"}, "credit-card": {"number": "3792 459318 98518", "expiration-date": "12/23"}}
{"age": 54, "name": ["Wendolyn", "Ortega"], "occupation": "Tractor Driver", "telephone": "1-975-090-1672", "address": {"address": "1274 Harbor Court", "city": "Mustang"}, "credit-card": {"number": "4600 5899 6829 6887", "expiration-date": "11/25"}}
[4]:
import dask.bag as db
import json
b = db.read_text('data/*.json').map(json.loads)
b
[4]:
dask.bag<loads, npartitions=10>
[5]:
b.take(2)
[5]:
({'age': 61,
'name': ['Emiko', 'Oliver'],
'occupation': 'Medical Student',
'telephone': '166.814.5565',
'address': {'address': '645 Drumm Line', 'city': 'Kennewick'},
'credit-card': {'number': '3792 459318 98518', 'expiration-date': '12/23'}},
{'age': 54,
'name': ['Wendolyn', 'Ortega'],
'occupation': 'Tractor Driver',
'telephone': '1-975-090-1672',
'address': {'address': '1274 Harbor Court', 'city': 'Mustang'},
'credit-card': {'number': '4600 5899 6829 6887',
'expiration-date': '11/25'}})
映射、过滤、聚合¶
我们可以通过过滤出感兴趣的特定记录,在其上映射函数来处理数据,并将这些结果聚合成总值来处理这些数据。
[6]:
b.filter(lambda record: record['age'] > 30).take(2) # Select only people over 30
[6]:
({'age': 61,
'name': ['Emiko', 'Oliver'],
'occupation': 'Medical Student',
'telephone': '166.814.5565',
'address': {'address': '645 Drumm Line', 'city': 'Kennewick'},
'credit-card': {'number': '3792 459318 98518', 'expiration-date': '12/23'}},
{'age': 54,
'name': ['Wendolyn', 'Ortega'],
'occupation': 'Tractor Driver',
'telephone': '1-975-090-1672',
'address': {'address': '1274 Harbor Court', 'city': 'Mustang'},
'credit-card': {'number': '4600 5899 6829 6887',
'expiration-date': '11/25'}})
[7]:
b.map(lambda record: record['occupation']).take(2) # Select the occupation field
[7]:
('Medical Student', 'Tractor Driver')
[8]:
b.count().compute() # Count total number of records
[8]:
10000
链式计算¶
通常将许多这些步骤串联在一个流水线中,仅在最后调用 compute
或 take
。
[9]:
result = (b.filter(lambda record: record['age'] > 30)
.map(lambda record: record['occupation'])
.frequencies(sort=True)
.topk(10, key=1))
result
[9]:
dask.bag<topk-aggregate, npartitions=1>
与所有延迟执行的 Dask 集合一样,我们需要调用 compute
来实际评估结果。在早期示例中使用的 take
方法也类似于 compute
,并且也会触发计算。
[10]:
result.compute()
[10]:
[('Merchant', 16),
('Coroner', 14),
('Book Binder', 13),
('Medical Practitioner', 13),
('Payroll Supervisor', 13),
('Telecommunications', 13),
('Thermal Insulator', 13),
('Pattern Maker', 12),
('Advertising Executive', 12),
('Insurance Staff', 12)]
转换和存储¶
有时我们希望像上面那样计算聚合,但有时我们希望将结果存储到磁盘上以便将来分析。为此,我们可以使用诸如 to_textfiles
和 json.dumps
之类的方法,或者我们可以转换为 Dask DataFrame 并使用它们的存储系统,这将在下一节中详细介绍。
[11]:
(b.filter(lambda record: record['age'] > 30) # Select records of interest
.map(json.dumps) # Convert Python objects to text
.to_textfiles('data/processed.*.json')) # Write to local disk
[11]:
['/home/runner/work/dask-examples/dask-examples/data/processed.0.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.1.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.2.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.3.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.4.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.5.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.6.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.7.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.8.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.9.json']
转换为 Dask DataFrame¶
Dask Bag 非常适合读取初始数据,进行少量预处理,然后将其传递给其他更有效的形式,例如 Dask DataFrame。Dask DataFrame 内部使用 Pandas,因此在数值数据上速度更快,并且具有更复杂的算法。
然而,Dask DataFrame 也需要数据以扁平列的形式组织。它对嵌套的 JSON 数据支持不太好(Bag 更适合处理此类数据)。
在这里,我们创建一个函数来展平嵌套数据结构,将其映射到记录上,然后将其转换为 Dask DataFrame。
[12]:
b.take(1)
[12]:
({'age': 61,
'name': ['Emiko', 'Oliver'],
'occupation': 'Medical Student',
'telephone': '166.814.5565',
'address': {'address': '645 Drumm Line', 'city': 'Kennewick'},
'credit-card': {'number': '3792 459318 98518', 'expiration-date': '12/23'}},)
[13]:
def flatten(record):
return {
'age': record['age'],
'occupation': record['occupation'],
'telephone': record['telephone'],
'credit-card-number': record['credit-card']['number'],
'credit-card-expiration': record['credit-card']['expiration-date'],
'name': ' '.join(record['name']),
'street-address': record['address']['address'],
'city': record['address']['city']
}
b.map(flatten).take(1)
[13]:
({'age': 61,
'occupation': 'Medical Student',
'telephone': '166.814.5565',
'credit-card-number': '3792 459318 98518',
'credit-card-expiration': '12/23',
'name': 'Emiko Oliver',
'street-address': '645 Drumm Line',
'city': 'Kennewick'},)
[14]:
df = b.map(flatten).to_dataframe()
df.head()
[14]:
年龄 | 职业 | 电话 | 信用卡号 | 信用卡有效期 | 姓名 | 街道地址 | 城市 | |
---|---|---|---|---|---|---|---|---|
0 | 61 | 医学生 | 166.814.5565 | 3792 459318 98518 | 12/23 | Emiko Oliver | 645 Drumm Line | Kennewick |
1 | 54 | 拖拉机司机 | 1-975-090-1672 | 4600 5899 6829 6887 | 11/25 | Wendolyn Ortega | 1274 Harbor Court | Mustang |
2 | 33 | 医生 | 107-044-4885 | 3464 081512 23342 | 03/20 | Alvin Rich | 1242 Vidal Plantation | Wyandotte |
3 | 34 | 顾问 | 219-748-6795 | 4018 1801 8111 7757 | 08/23 | Toccara Rogers | 252 Sampson Drive | Parma Heights |
4 | 34 | 平面设计师 | 1-509-313-7125 | 4886 7380 4681 0434 | 05/18 | Randal Roberts | 767 Telegraph Side road | 纽约 |
现在我们可以执行与之前相同的计算,但现在使用 Pandas 和 Dask DataFrame。
[15]:
df[df.age > 30].occupation.value_counts().nlargest(10).compute()
[15]:
Merchant 16
Coroner 14
Thermal Insulator 13
Book Binder 13
Payroll Supervisor 13
Medical Practitioner 13
Telecommunications 13
Optometrist 12
Advertising Assistant 12
Care Manager 12
Name: occupation, dtype: int64