📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page
  • Extract,Transform,Load
  • 套用Prefect
  • 第1步
  • 第2步
  • 第3步

Was this helpful?

  1. 初级教程

Prefect实现ETL

在本教程中,我们将使用Prefect来改进前面章节中ETL工作流的整体结构。

跟随终端演示:

教学代码一览

cd examples/tutorial
python 02_etl_flow.py

Extract,Transform,Load

Prefect的最小工作单元是Python函数。因此,我们的首要task是将示例分解成函数。

可能想到的第一个问题是“函数应该有多大/小?”。一种简单的方法是显式地将工作分解为“提取”,“转换”和“加载”函数,如下所示:

def extract(...):
    # fetch all aircraft and reference data
    ...
    return all_data

def transform(all_data):
    # clean the live data
    ...

def load(all_data):
    # save all transformed data and reference data to the database
    ...

这比较函数式,但它仍然不能解决原来的代码中的一些问题:

  • 如果提取实时航班数据失败,已经获取的关联数据怎么办?

  • 如果存储数据库不可用,已经转换的数据怎么办?

这些要点突出了一个事实,即.extract()和.load()仍是任意范围的。在决定每个功能的大小时,这使我们有了一个经验法则:查看工作流在每个步骤所需的输入和输出数据。在我们的场景,关联数据和实时航班数据来自不同的来源,并分开存储。考虑到这一新见解,让我们进行更多重构:

def extract_reference_data(...):
    # fetch reference data
    ...
    return reference_data

def extract_live_data(...):
    # fetch live data
    ...
    return live_data

def transform(live_data, reference_data):
    # clean the live data
    ...
    return transformed_data

def load_reference_data(reference_data):
    # save reference data to the database
    ...

def load_live_data(transformed_data):
    # save transformed live data to the database
    ...

套用Prefect

现在我们有适当大小的函数,并且知道这些函数之间的关系,可以用Prefect封装我们的工作流。

第1步

使用prefect.task装饰需要Prefect调度执行的任何函数:

from prefect import task, Flow

@task
def extract_reference_data(...):
    # fetch reference data
    ...
    return reference_data

@task
def extract_live_data(...):
    # fetch live data
    ...
    return live_data

@task
def transform(live_data, reference_data):
    # clean the live data
    ...
    return transformed_data

@task
def load_reference_data(reference_data):
    # save reference data to the database
    ...

@task
def load_live_data(transformed_data):
    # save transformed live data to the database
    ...

第2步

在prefect.Flow上下文中指定业务数据和task依赖关系:

# ...task definitions above

with Flow("Aircraft-ETL") as flow:
    reference_data = extract_reference_data()
    live_data = extract_live_data()

    transformed_live_data = transform(live_data, reference_data)

    load_reference_data(reference_data)
    load_live_data(transformed_live_data)

注意:此时没有实际执行任何task,因为使用Flow(...):上下文管理器允许Prefect推理出task之间的依赖关系,并构建稍后将执行的执行图。这个业务场景的执行图如下所示:

与最初的实现相比有了很大的改进!

第3步

执行Flow

# ...flow definition above

flow.run()

此时,以适当的顺序执行Task(我们的Python函数),并按照执行图中的指定从task到task传递业务数据。

Prefect Task库

Prefect提供一个task库,其中包含常见的task实现以及与Kubernetes,GitHub,Slack,Docker,AWS,GCP等的集成!

接下来,让我们对flow进行参数化以使其复用性更好。

PreviousETL介绍Next使用Parameters

Last updated 5 years ago

Was this helpful?

Prefect官网
英版原文
联系译者
Prefect Aircraft ETL