在线 Notebook

您可以在 在线会话 中运行此 notebook Binder 或在 Github 上查看

Dask Bag

Dask Bag 在 Python 对象集合上实现了诸如 mapfiltergroupby 和聚合等操作。它使用 Python 迭代器以并行且占用内存少的方式完成这些操作。它类似于 itertools 的并行版本或 PySpark RDD 的 Pythonic 版本。

Dask Bag 通常用于对日志文件、JSON 记录或其他用户定义的 Python 对象进行简单的预处理。

完整的 API 文档可在此处获取:https://docs.dask.org.cn/en/latest/bag-api.html

启动 Dask Client 以查看 Dashboard

启动 Dask Client 是可选的。它将提供一个 Dashboard,这对于深入了解计算过程很有用。

当您在下方创建 Client 后,将显示 Dashboard 的链接。我们建议您在一侧屏幕上打开 Dashboard,同时在另一侧使用 notebook。这可能需要一些努力来安排您的窗口,但在学习时同时看到它们非常有用。

[1]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client
[1]:

Client

Client-db1fb37f-0ddf-11ed-9823-000d3a8f7959

连接方法: Cluster 对象 Cluster 类型: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster 信息

创建随机数据

我们创建一组随机记录数据,并将其存储到磁盘上的多个 JSON 文件中。这些数据将作为本 notebook 的示例数据。

[2]:
import dask
import json
import os

os.makedirs('data', exist_ok=True)              # Create data/ directory

b = dask.datasets.make_people()                 # Make records of people
b.map(json.dumps).to_textfiles('data/*.json')   # Encode as JSON, write to disk
[2]:
['/home/runner/work/dask-examples/dask-examples/data/0.json',
 '/home/runner/work/dask-examples/dask-examples/data/1.json',
 '/home/runner/work/dask-examples/dask-examples/data/2.json',
 '/home/runner/work/dask-examples/dask-examples/data/3.json',
 '/home/runner/work/dask-examples/dask-examples/data/4.json',
 '/home/runner/work/dask-examples/dask-examples/data/5.json',
 '/home/runner/work/dask-examples/dask-examples/data/6.json',
 '/home/runner/work/dask-examples/dask-examples/data/7.json',
 '/home/runner/work/dask-examples/dask-examples/data/8.json',
 '/home/runner/work/dask-examples/dask-examples/data/9.json']

读取 JSON 数据

既然我们有了文件中的 JSON 数据,现在就使用 Dask Bag 和 Python 的 JSON 模块来看看它。

[3]:
!head -n 2 data/0.json
{"age": 61, "name": ["Emiko", "Oliver"], "occupation": "Medical Student", "telephone": "166.814.5565", "address": {"address": "645 Drumm Line", "city": "Kennewick"}, "credit-card": {"number": "3792 459318 98518", "expiration-date": "12/23"}}
{"age": 54, "name": ["Wendolyn", "Ortega"], "occupation": "Tractor Driver", "telephone": "1-975-090-1672", "address": {"address": "1274 Harbor Court", "city": "Mustang"}, "credit-card": {"number": "4600 5899 6829 6887", "expiration-date": "11/25"}}
[4]:
import dask.bag as db
import json

b = db.read_text('data/*.json').map(json.loads)
b
[4]:
dask.bag<loads, npartitions=10>
[5]:
b.take(2)
[5]:
({'age': 61,
  'name': ['Emiko', 'Oliver'],
  'occupation': 'Medical Student',
  'telephone': '166.814.5565',
  'address': {'address': '645 Drumm Line', 'city': 'Kennewick'},
  'credit-card': {'number': '3792 459318 98518', 'expiration-date': '12/23'}},
 {'age': 54,
  'name': ['Wendolyn', 'Ortega'],
  'occupation': 'Tractor Driver',
  'telephone': '1-975-090-1672',
  'address': {'address': '1274 Harbor Court', 'city': 'Mustang'},
  'credit-card': {'number': '4600 5899 6829 6887',
   'expiration-date': '11/25'}})

映射、过滤、聚合

我们可以通过过滤出感兴趣的特定记录,在其上映射函数来处理数据,并将这些结果聚合成总值来处理这些数据。

[6]:
b.filter(lambda record: record['age'] > 30).take(2)  # Select only people over 30
[6]:
({'age': 61,
  'name': ['Emiko', 'Oliver'],
  'occupation': 'Medical Student',
  'telephone': '166.814.5565',
  'address': {'address': '645 Drumm Line', 'city': 'Kennewick'},
  'credit-card': {'number': '3792 459318 98518', 'expiration-date': '12/23'}},
 {'age': 54,
  'name': ['Wendolyn', 'Ortega'],
  'occupation': 'Tractor Driver',
  'telephone': '1-975-090-1672',
  'address': {'address': '1274 Harbor Court', 'city': 'Mustang'},
  'credit-card': {'number': '4600 5899 6829 6887',
   'expiration-date': '11/25'}})
[7]:
b.map(lambda record: record['occupation']).take(2)  # Select the occupation field
[7]:
('Medical Student', 'Tractor Driver')
[8]:
b.count().compute()  # Count total number of records
[8]:
10000

链式计算

通常将许多这些步骤串联在一个流水线中,仅在最后调用 computetake

[9]:
result = (b.filter(lambda record: record['age'] > 30)
           .map(lambda record: record['occupation'])
           .frequencies(sort=True)
           .topk(10, key=1))
result
[9]:
dask.bag<topk-aggregate, npartitions=1>

与所有延迟执行的 Dask 集合一样,我们需要调用 compute 来实际评估结果。在早期示例中使用的 take 方法也类似于 compute,并且也会触发计算。

[10]:
result.compute()
[10]:
[('Merchant', 16),
 ('Coroner', 14),
 ('Book Binder', 13),
 ('Medical Practitioner', 13),
 ('Payroll Supervisor', 13),
 ('Telecommunications', 13),
 ('Thermal Insulator', 13),
 ('Pattern Maker', 12),
 ('Advertising Executive', 12),
 ('Insurance Staff', 12)]

转换和存储

有时我们希望像上面那样计算聚合,但有时我们希望将结果存储到磁盘上以便将来分析。为此,我们可以使用诸如 to_textfilesjson.dumps 之类的方法,或者我们可以转换为 Dask DataFrame 并使用它们的存储系统,这将在下一节中详细介绍。

[11]:
(b.filter(lambda record: record['age'] > 30)  # Select records of interest
  .map(json.dumps)                            # Convert Python objects to text
  .to_textfiles('data/processed.*.json'))     # Write to local disk
[11]:
['/home/runner/work/dask-examples/dask-examples/data/processed.0.json',
 '/home/runner/work/dask-examples/dask-examples/data/processed.1.json',
 '/home/runner/work/dask-examples/dask-examples/data/processed.2.json',
 '/home/runner/work/dask-examples/dask-examples/data/processed.3.json',
 '/home/runner/work/dask-examples/dask-examples/data/processed.4.json',
 '/home/runner/work/dask-examples/dask-examples/data/processed.5.json',
 '/home/runner/work/dask-examples/dask-examples/data/processed.6.json',
 '/home/runner/work/dask-examples/dask-examples/data/processed.7.json',
 '/home/runner/work/dask-examples/dask-examples/data/processed.8.json',
 '/home/runner/work/dask-examples/dask-examples/data/processed.9.json']

转换为 Dask DataFrame

Dask Bag 非常适合读取初始数据,进行少量预处理,然后将其传递给其他更有效的形式,例如 Dask DataFrame。Dask DataFrame 内部使用 Pandas,因此在数值数据上速度更快,并且具有更复杂的算法。

然而,Dask DataFrame 也需要数据以扁平列的形式组织。它对嵌套的 JSON 数据支持不太好(Bag 更适合处理此类数据)。

在这里,我们创建一个函数来展平嵌套数据结构,将其映射到记录上,然后将其转换为 Dask DataFrame。

[12]:
b.take(1)
[12]:
({'age': 61,
  'name': ['Emiko', 'Oliver'],
  'occupation': 'Medical Student',
  'telephone': '166.814.5565',
  'address': {'address': '645 Drumm Line', 'city': 'Kennewick'},
  'credit-card': {'number': '3792 459318 98518', 'expiration-date': '12/23'}},)
[13]:
def flatten(record):
    return {
        'age': record['age'],
        'occupation': record['occupation'],
        'telephone': record['telephone'],
        'credit-card-number': record['credit-card']['number'],
        'credit-card-expiration': record['credit-card']['expiration-date'],
        'name': ' '.join(record['name']),
        'street-address': record['address']['address'],
        'city': record['address']['city']
    }

b.map(flatten).take(1)
[13]:
({'age': 61,
  'occupation': 'Medical Student',
  'telephone': '166.814.5565',
  'credit-card-number': '3792 459318 98518',
  'credit-card-expiration': '12/23',
  'name': 'Emiko Oliver',
  'street-address': '645 Drumm Line',
  'city': 'Kennewick'},)
[14]:
df = b.map(flatten).to_dataframe()
df.head()
[14]:
年龄 职业 电话 信用卡号 信用卡有效期 姓名 街道地址 城市
0 61 医学生 166.814.5565 3792 459318 98518 12/23 Emiko Oliver 645 Drumm Line Kennewick
1 54 拖拉机司机 1-975-090-1672 4600 5899 6829 6887 11/25 Wendolyn Ortega 1274 Harbor Court Mustang
2 33 医生 107-044-4885 3464 081512 23342 03/20 Alvin Rich 1242 Vidal Plantation Wyandotte
3 34 顾问 219-748-6795 4018 1801 8111 7757 08/23 Toccara Rogers 252 Sampson Drive Parma Heights
4 34 平面设计师 1-509-313-7125 4886 7380 4681 0434 05/18 Randal Roberts 767 Telegraph Side road 纽约

现在我们可以执行与之前相同的计算,但现在使用 Pandas 和 Dask DataFrame。

[15]:
df[df.age > 30].occupation.value_counts().nlargest(10).compute()
[15]:
Merchant                 16
Coroner                  14
Thermal Insulator        13
Book Binder              13
Payroll Supervisor       13
Medical Practitioner     13
Telecommunications       13
Optometrist              12
Advertising Assistant    12
Care Manager             12
Name: occupation, dtype: int64

了解更多

您可能对以下链接感兴趣