使用 Prefect 构建 ETL 流水线
目录
使用 Prefect 构建 ETL 流水线¶
Prefect 是一个用于自动化数据工作流的平台。数据工程师和数据科学家可以构建、测试和部署生产流水线,而无需担心生产中的所有“负面工程”方面。例如,Prefect 使得部署一个在复杂计划下运行、在失败时需要任务重试、并在某些任务完成时发送通知的工作流变得容易。Prefect 构建在 Dask 之上,并依赖 Dask 在分布式环境中调度和管理 Prefect 工作流的执行。
本示例演示了如何在 Dask 上运行 Prefect ETL Flow,最终创建一个 GIF。虽然这在 Prefect 的用例中有些不寻常,但我们对非传统用例并不陌生。
在工作流引擎领域,Prefect 支持许多独特的功能;在这个特定的示例中,我们将看到
工作流参数化
工作流任务的动态运行时“映射”
可定制的执行逻辑
这是其他任何引擎都无法提供的。
目录
目标¶
为了演示 Prefect 和 Dask 如何协同工作,我们将构建并执行一个标准的“提取 / 转换 / 加载”(ETL)工作流来处理一些基本的图像数据。大多数 ETL 工作流涉及将数据从一个数据库按计划迁移到另一个数据库。在本例中,我们将把位于已知 URL 的数据文件移动到本地硬盘,将单个文件转换为一系列帧,并将这些帧编译成 GIF。该 URL 引用了一个包含原始字节的文件,例如
b"""aÙw˜≠•∆≠≠fi#!≠≠÷≠•Ω≠úΩ••µú•µîúµ•úΩ••Ω3&µ•Ω! µ≠∆≠•¥4(%µú∑≠≠Œ≠î≠≠≠∆≠îµúî≠úîµE5.≠ú≠≠•Œµµfi••∆•≠ŒµµŒúúΩ62&)1&623µ•∆Ωµ÷úî•ßjxΩΩÁú•Ωµ≠Œ••≠ú•≠Ω≠∆≠µÁâUV≠µ‹ΩµŒîî•NC5µ≠Ÿôãô••µîú≠#VHCuhl≠≠ΩôchâRIoc]™≠Á≠î•™ú»öis•ú•f7,íYfL9?îî≠≠•÷∑ò™gWVxGEΩ≠–))1qB5µ≠Ω81R,´tÜñWV!HCDBB5;5?"""
我们工作流的步骤如下
提取:从 URL(由
Parameter
指定)拉取数据文件到磁盘转换:将文件分割成多个文件,每个文件对应一个单帧
加载:单独存储每个帧,并将这些帧编译成一个 GIF
一旦我们构建了 Flow,我们可以使用 Parameter
的不同值来执行它,甚至可以按夜间计划运行它。
注意:如果计划在真正的分布式环境中执行此 Flow,将图像写入本地文件系统将不是合适的。我们应该转而使用外部数据存储,例如 Google Cloud Storage 或适当的数据库。
提取¶
首先,我们将定义任务,用于从给定 URL 提取图像数据文件并将其保存到给定文件位置。为此,我们将利用两种创建 Prefect 任务的方法:- task
装饰器,用于将任何 Python 函数转换为任务 - 来自 Prefect“任务库”的预编写、可配置任务,它帮助我们抽象一些标准样板代码
此外,我们将利用以下 Prefect 概念:- 一个 Prefect 信号,用于标记此任务及其下游依赖项在本地文件系统已存在文件时成功“跳过” - 重试语义:如果由于某种原因,我们的 curl
命令连接失败,我们希望它重试最多 2 次,每次延迟 10 秒。这样,如果按计划运行此工作流,我们将无需担心临时的间歇性连接问题。
现在我们只是在定义单个任务 - 直到创建完整的 Flow 之前,我们实际上不会设置依赖结构。
[1]:
import datetime
import os
import prefect
from prefect import task
from prefect.engine.signals import SKIP
from prefect.tasks.shell import ShellTask
@task
def curl_cmd(url: str, fname: str) -> str:
"""
The curl command we wish to execute.
"""
if os.path.exists(fname):
raise SKIP("Image data file already exists.")
return "curl -fL -o {fname} {url}".format(fname=fname, url=url)
# ShellTask is a task from the Task library which will execute a given command in a subprocess
# and fail if the command returns a non-zero exit code
download = ShellTask(name="curl_task", max_retries=2, retry_delay=datetime.timedelta(seconds=10))
转换¶
接下来,我们需要定义加载图像数据文件并将其分割成多个帧的任务。在本例中,每个帧由 4 个换行符分隔。注意,如果前两个任务被“跳过”,Prefect 中的默认行为是也跳过下游依赖项。然而,与 Prefect 中的大多数事物一样,此行为是可定制的。在本例中,我们希望无论上游是否跳过,此任务都能运行,因此我们将 skip_on_upstream_skip
标志设置为 False
。
[2]:
@task(skip_on_upstream_skip=False)
def load_and_split(fname: str) -> list:
"""
Loads image data file at `fname` and splits it into
multiple frames. Returns a list of bytes, one element
for each frame.
"""
with open(fname, "rb") as f:
images = f.read()
return [img for img in images.split(b"\n" * 4) if img]
加载¶
最后,我们想将帧写入磁盘,并将这些帧组合成一个 GIF。为了实现此目标,我们将利用 Prefect 的任务“映射”功能,该功能方便地根据上游输出生成新任务。在这种情况下,我们将编写一个用于将图像写入磁盘的单个任务,并对上面由 load_and_split
返回的所有图像帧进行“映射”!为了推断当前是哪个帧,我们查看 `prefect.context
<https://docs.prefect.io/guide/core_concepts/execution.html#context>`__。
此外,我们可以对映射任务进行“规约”(reduce)——在本例中,我们将获取映射任务的集合,并将它们传递到我们的 combine_to_gif
任务中,用于创建和保存我们的 GIF。
[3]:
@task
def write_to_disk(image: bytes) -> bytes:
"""
Given a single image represented as bytes, writes the image
to the present working directory with a filename determined
by `map_index`. Returns the image bytes.
"""
frame_no = prefect.context.get("map_index")
with open("frame_{0:0=2d}.gif".format(frame_no), "wb") as f:
f.write(image)
return image
[4]:
import imageio
from io import BytesIO
@task
def combine_to_gif(image_bytes: list) -> None:
"""
Given a list of ordered images represented as bytes,
combines them into a single GIF stored in the present working directory.
"""
images = [imageio.imread(BytesIO(image)) for image in image_bytes]
imageio.mimsave('./clip.gif', images)
构建 Flow¶
最后,我们需要将任务组合成一个 Prefect “Flow”。与 Dask 的 delayed
接口类似,所有计算都被延迟,在此步骤中不会执行任何任务代码。由于 Prefect 在任务之间维护着更严格的契约,并且还需要在非 Dask 执行环境中运行的能力,因此延迟执行的机制独立于 Dask。
除了我们已经定义的任务,我们还引入了两个“参数”(Parameters),用于指定数据的 URL 和本地文件位置。在运行时,我们可以选择覆盖这些任务以返回不同的值。
[5]:
from prefect import Parameter, Flow
DATA_URL = Parameter("DATA_URL",
default="https://github.com/cicdw/image-data/blob/master/all-images.img?raw=true")
DATA_FILE = Parameter("DATA_FILE", default="image-data.img")
with Flow("Image ETL") as flow:
# Extract
command = curl_cmd(DATA_URL, DATA_FILE)
curl = download(command=command)
# Transform
# we use the `upstream_tasks` keyword to specify non-data dependencies
images = load_and_split(fname=DATA_FILE, upstream_tasks=[curl])
# Load
frames = write_to_disk.map(images)
result = combine_to_gif(frames)
flow.visualize()
[5]:
在 Dask 上运行 Flow¶
现在我们已经构建了 Flow,它独立于 Dask。我们可以按顺序执行此 Flow,一个任务接一个任务,但在我们将图像映射到文件的过程中存在固有的并行性,我们希望加以利用。幸运的是,Dask 使得这很容易实现。
首先,我们将启动一个本地 Dask 集群。然后,我们将使用 Prefect 的 DaskExecutor
运行我们的 Flow,它将把每个任务提交到我们的 Dask 集群,并使用 Dask 的分布式调度器来确定每个任务何时何地运行。本质上,我们构建了一个有向无环图 (DAG),并简单地将该 DAG“提交”给 Dask,以便以分布式方式处理其执行。
[6]:
# start our Dask cluster
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1)
# point Prefect's DaskExecutor to our Dask cluster
from prefect.executors import DaskExecutor
executor = DaskExecutor(address=client.scheduler.address)
flow.run(executor=executor)
[2022-07-27 19:17:06+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'Image ETL'
[2022-07-27 19:17:06+0000] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at tcp://127.0.0.1:41453
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/scheduler.py:4949: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.
warnings.warn(
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/worker.py:4606: UserWarning: Large object of size 1.94 MiB detected in task graph:
{'task': <Task: write_to_disk>, 'state': None, 'up ... _parent': True}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
warnings.warn(
[2022-07-27 19:17:09+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/prefect/executors/dask.py:329: RuntimeWarning: coroutine 'rpc.close_rpc' was never awaited
scheduler_comm.close_rpc()
[6]:
<Success: "All reference tasks succeeded.">