📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page

Was this helpful?

  1. 初级教程

使用Parameters

跟随终端演示:

cd examples/tutorial
python 03_parameterized_etl_flow.py

上一节教程中,我们将Aircraft ETL脚本重构为Prefect flow。但是,extract_live_data task是硬编码,只能提取特定区域内的航班数据,在这种情况下,Dulles国际机场周围的半径为200公里:

@task
def extract_live_data():
    # Get the live aircraft vector data around Dulles airport
    dulles_airport_position = aclib.Position(lat=38.9519444444, long=-77.4480555556)
    area_surrounding_dulles = aclib.bounding_box(dulles_airport_position, radius_km=200)

    print("fetching live aircraft data...")
    raw_aircraft_data = aclib.fetch_live_aircraft_data(area=area_surrounding_dulles)

    return raw_aircraft_data

允许从广泛范围获取数据,而不仅仅是在单个机场附近,这是理想方案。一种方法是允许extract_live_data采用经度和纬度参数。但是,我们可以走得更远:事实证明,我们的关联数据中有已经机场位置信息,可以利用!

让我们重构我们的Python函数以从关联数据获取用户指定的机场:

@task
def extract_live_data(airport, radius, ref_data):
    # Get the live aircraft vector data around the given airport (or none)
    area = None
    if airport:
        airport_data = ref_data.airports[airport]
        airport_position = aclib.Position(
            lat=float(airport_data["latitude"]), long=float(airport_data["longitude"])
        )
        area = aclib.bounding_box(airport_position, radius)

    print("fetching live aircraft data...")
    raw_aircraft_data = aclib.fetch_live_aircraft_data(area=area)

    return raw_aircraft_data

假如你感到好奇,area=None将不会获取所有已知航班的实时数据,而不管其所在的区域如何。

如何在Prefect flow中控制这些函数参数呢?通过使用prefect.Parameter:

from prefect import Parameter

# ...task definitions...

with Flow("Aircraft-ETL") as flow:
    airport = Parameter("airport", default="IAD")
    radius = Parameter("radius", default=200)

    reference_data = extract_reference_data()
    live_data = extract_live_data(airport, radius, reference_data)

    transformed_live_data = transform(live_data, reference_data)

    load_reference_data(reference_data)
    load_live_data(transformed_live_data)

就像task一样,直到调用flow.run()时,才使用默认值(如果提供的话)或传递给.run()的覆盖值设定参数:

# Run the Flow with default airport=IAD & radius=200
flow.run()

# ...default radius and a different airport!
flow.run(airport="DCA")

最后,请注意我们的执行图已改变:现在获取实时数据取决于获取到的关联数据:

接下来,如果一个task失败了会怎样,以及我们如何制定出现问题时采取的措施?

PreviousPrefect实现ETLNext失败处理

Last updated 5 years ago

Was this helpful?

Prefect官网
英版原文
联系译者