使用 Prefect 构建 ETL 流水线

实时 Notebook

您可以在 实时会话 中运行此 notebook Binder 或在 Github 上查看它。

使用 Prefect 构建 ETL 流水线

Prefect 是一个用于自动化数据工作流的平台。数据工程师和数据科学家可以构建、测试和部署生产流水线,而无需担心生产中的所有“负面工程”方面。例如,Prefect 使得部署一个在复杂计划下运行、在失败时需要任务重试、并在某些任务完成时发送通知的工作流变得容易。Prefect 构建在 Dask 之上,并依赖 Dask 在分布式环境中调度和管理 Prefect 工作流的执行。

本示例演示了如何在 Dask 上运行 Prefect ETL Flow,最终创建一个 GIF。虽然这在 Prefect 的用例中有些不寻常,但我们对非传统用例并不陌生。

在工作流引擎领域,Prefect 支持许多独特的功能;在这个特定的示例中,我们将看到

  • 工作流参数化

  • 工作流任务的动态运行时“映射”

  • 可定制的执行逻辑

这是其他任何引擎都无法提供的。

目录

  1. 目标描述

  2. 构建我们的 Flow

    1. 提取

    2. 转换

    3. 加载

    4. 组合各个部分

  3. 在 Dask 上运行我们的 Flow

  4. 观看我们的 GIF

目标

为了演示 Prefect 和 Dask 如何协同工作,我们将构建并执行一个标准的“提取 / 转换 / 加载”(ETL)工作流来处理一些基本的图像数据。大多数 ETL 工作流涉及将数据从一个数据库按计划迁移到另一个数据库。在本例中,我们将把位于已知 URL 的数据文件移动到本地硬盘,将单个文件转换为一系列帧,并将这些帧编译成 GIF。该 URL 引用了一个包含原始字节的文件,例如

b"""aÙw˜≠•∆≠≠fi#!≠≠÷≠•Ω≠úΩ••µú•µîúµ•úΩ••Ω3&µ•Ω! µ≠∆≠•¥4(%µú∑≠≠Œ≠î≠≠≠∆≠îµúî≠úîµE5.≠ú≠≠•Œµµfi••∆•≠ŒµµŒúúΩ62&)1&623µ•∆Ωµ÷úî•ßjxΩΩÁú•Ωµ≠Œ••≠ú•≠Ω≠∆≠µÁâUV≠µ‹ΩµŒîî•NC5µ≠Ÿôãô••µîú≠#VHCuhl≠≠ΩôchâRIoc]™≠Á≠î•™ú»öis•ú•f7,íYfL9?îî≠≠•÷∑ò™gWVxGEΩ≠–))1qB5µ≠Ω81R,´tÜñWV!HCDBB5;5?"""

我们工作流的步骤如下

  1. 提取:从 URL(由 Parameter 指定)拉取数据文件到磁盘

  2. 转换:将文件分割成多个文件,每个文件对应一个单帧

  3. 加载:单独存储每个帧,并将这些帧编译成一个 GIF

一旦我们构建了 Flow,我们可以使用 Parameter 的不同值来执行它,甚至可以按夜间计划运行它。

注意:如果计划在真正的分布式环境中执行此 Flow,将图像写入本地文件系统将是合适的。我们应该转而使用外部数据存储,例如 Google Cloud Storage 或适当的数据库。

提取

首先,我们将定义任务,用于从给定 URL 提取图像数据文件并将其保存到给定文件位置。为此,我们将利用两种创建 Prefect 任务的方法:- task 装饰器,用于将任何 Python 函数转换为任务 - 来自 Prefect“任务库”的预编写、可配置任务,它帮助我们抽象一些标准样板代码

此外,我们将利用以下 Prefect 概念:- 一个 Prefect 信号,用于标记此任务及其下游依赖项在本地文件系统已存在文件时成功“跳过” - 重试语义:如果由于某种原因,我们的 curl 命令连接失败,我们希望它重试最多 2 次,每次延迟 10 秒。这样,如果按计划运行此工作流,我们将无需担心临时的间歇性连接问题。

现在我们只是在定义单个任务 - 直到创建完整的 Flow 之前,我们实际上不会设置依赖结构。

[1]:
import datetime
import os

import prefect
from prefect import task
from prefect.engine.signals import SKIP
from prefect.tasks.shell import ShellTask


@task
def curl_cmd(url: str, fname: str) -> str:
    """
    The curl command we wish to execute.
    """
    if os.path.exists(fname):
        raise SKIP("Image data file already exists.")
    return "curl -fL -o {fname} {url}".format(fname=fname, url=url)


# ShellTask is a task from the Task library which will execute a given command in a subprocess
# and fail if the command returns a non-zero exit code

download = ShellTask(name="curl_task", max_retries=2, retry_delay=datetime.timedelta(seconds=10))

转换

接下来,我们需要定义加载图像数据文件并将其分割成多个帧的任务。在本例中,每个帧由 4 个换行符分隔。注意,如果前两个任务被“跳过”,Prefect 中的默认行为是也跳过下游依赖项。然而,与 Prefect 中的大多数事物一样,此行为是可定制的。在本例中,我们希望无论上游是否跳过,此任务都能运行,因此我们将 skip_on_upstream_skip 标志设置为 False

[2]:
@task(skip_on_upstream_skip=False)
def load_and_split(fname: str) -> list:
    """
    Loads image data file at `fname` and splits it into
    multiple frames.  Returns a list of bytes, one element
    for each frame.
    """
    with open(fname, "rb") as f:
        images = f.read()

    return [img for img in images.split(b"\n" * 4) if img]

加载

最后,我们想将帧写入磁盘,并将这些帧组合成一个 GIF。为了实现此目标,我们将利用 Prefect 的任务“映射”功能,该功能方便地根据上游输出生成新任务。在这种情况下,我们将编写一个用于将图像写入磁盘的单个任务,并对上面由 load_and_split 返回的所有图像帧进行“映射”!为了推断当前是哪个帧,我们查看 `prefect.context <https://docs.prefect.io/guide/core_concepts/execution.html#context>`__。

此外,我们可以对映射任务进行“规约”(reduce)——在本例中,我们将获取映射任务的集合,并将它们传递到我们的 combine_to_gif 任务中,用于创建和保存我们的 GIF。

[3]:
@task
def write_to_disk(image: bytes) -> bytes:
    """
    Given a single image represented as bytes, writes the image
    to the present working directory with a filename determined
    by `map_index`.  Returns the image bytes.
    """
    frame_no = prefect.context.get("map_index")
    with open("frame_{0:0=2d}.gif".format(frame_no), "wb") as f:
        f.write(image)
    return image
[4]:
import imageio
from io import BytesIO


@task
def combine_to_gif(image_bytes: list) -> None:
    """
    Given a list of ordered images represented as bytes,
    combines them into a single GIF stored in the present working directory.
    """
    images = [imageio.imread(BytesIO(image)) for image in image_bytes]
    imageio.mimsave('./clip.gif', images)

构建 Flow

最后,我们需要将任务组合成一个 Prefect “Flow”。与 Dask 的 delayed 接口类似,所有计算都被延迟,在此步骤中不会执行任何任务代码。由于 Prefect 在任务之间维护着更严格的契约,并且还需要在非 Dask 执行环境中运行的能力,因此延迟执行的机制独立于 Dask。

除了我们已经定义的任务,我们还引入了两个“参数”(Parameters),用于指定数据的 URL 和本地文件位置。在运行时,我们可以选择覆盖这些任务以返回不同的值。

[5]:
from prefect import Parameter, Flow


DATA_URL = Parameter("DATA_URL",
                     default="https://github.com/cicdw/image-data/blob/master/all-images.img?raw=true")

DATA_FILE = Parameter("DATA_FILE", default="image-data.img")


with Flow("Image ETL") as flow:

    # Extract
    command = curl_cmd(DATA_URL, DATA_FILE)
    curl = download(command=command)

    # Transform
    # we use the `upstream_tasks` keyword to specify non-data dependencies
    images = load_and_split(fname=DATA_FILE, upstream_tasks=[curl])

    # Load
    frames = write_to_disk.map(images)
    result = combine_to_gif(frames)


flow.visualize()
[5]:
../_images/applications_prefect-etl_10_0.svg

在 Dask 上运行 Flow

现在我们已经构建了 Flow,它独立于 Dask。我们可以按顺序执行此 Flow,一个任务接一个任务,但在我们将图像映射到文件的过程中存在固有的并行性,我们希望加以利用。幸运的是,Dask 使得这很容易实现。

首先,我们将启动一个本地 Dask 集群。然后,我们将使用 Prefect 的 DaskExecutor 运行我们的 Flow,它将把每个任务提交到我们的 Dask 集群,并使用 Dask 的分布式调度器来确定每个任务何时何地运行。本质上,我们构建了一个有向无环图 (DAG),并简单地将该 DAG“提交”给 Dask,以便以分布式方式处理其执行。

[6]:
# start our Dask cluster
from dask.distributed import Client


client = Client(n_workers=4, threads_per_worker=1)

# point Prefect's DaskExecutor to our Dask cluster

from prefect.executors import DaskExecutor

executor = DaskExecutor(address=client.scheduler.address)
flow.run(executor=executor)
[2022-07-27 19:17:06+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'Image ETL'
[2022-07-27 19:17:06+0000] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at tcp://127.0.0.1:41453
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/scheduler.py:4949: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.
  warnings.warn(
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/worker.py:4606: UserWarning: Large object of size 1.94 MiB detected in task graph:
  {'task': <Task: write_to_disk>, 'state': None, 'up ... _parent': True}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  warnings.warn(
[2022-07-27 19:17:09+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/prefect/executors/dask.py:329: RuntimeWarning: coroutine 'rpc.close_rpc' was never awaited
  scheduler_comm.close_rpc()
[6]:
<Success: "All reference tasks succeeded.">

下一步

现在我们已经构建了工作流,接下来做什么?感兴趣的读者应该尝试

  • 再次运行 Flow,看看 SKIP 信号的行为如何

  • 对 URL 和文件位置使用不同的参数(可以通过将参数名称作为关键字参数传递给 flow.run() 来覆盖参数值)

  • 为最终 GIF 的文件名引入一个新的 Parameter

  • 使用 Prefect 的 调度器接口按计划运行我们的工作流

播放

最后,让我们观看我们的作品吧!

[7]:
from IPython.display import Image

Image(filename="clip.gif", alt="Rick Daskley")
[7]:
<IPython.core.display.Image object>