📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page
  • 提取、转换、加载
  • flow
  • 命令式flow

Was this helpful?

  1. 进阶教程

ETL

数据工程的入门例子,就像"hello, world!"对于编程入门一样

ETL(提取、转换、加载)是基本的数据工作流,但是在大多数数据工程框架中进行设置可能会是令人惊讶地复杂。

某些工具没有在task之间传递数据的简便方法,导致公司维护所有可能的来源和接收者组合的内部表示(S3_to_Redshift,S3_to_S3,Redshift_to_S3,Postgres_to_Redshift,Postgres_to_S3等)。其他框架具有灵活的来源和接收者,但缺乏编写完全可定制的转换的能力。

Prefect使ETL变得容易。

提取、转换、加载

开始定义三个函数,将用作我们的提取,转换和加载task。可以在这些函数中放入所需的任何内容,作为说明,我们将使用一个简单的数组。

为了唤醒Prefect机制,我们要做的就是将@task装饰器应用于函数。

在task之间传递数据

不必担心在task之间传递大型数据对象。只要适合内存,Prefect都可以使用特殊设置进行处理。如果不是这样,则有多种方法可以在整个集群中分配操作。

from prefect import task

@task
def extract():
    """Get a list of data"""
    return [1, 2, 3]

@task
def transform(data):
    """Multiply the input by 10"""
    return [i * 10 for i in data]

@task
def load(data):
    """Print the data to indicate it was received"""
    print("Here's your data: {}".format(data))

flow

现在我们有了task,要创建一个flow并像调用函数一样调用task。在后台,Prefect正在生成一个计算图,该图跟踪我们task之间的所有依赖关系。

延迟执行

看起来我们正在调用ETL函数,但实际上没有执行任何操作。在Prefect中,调用task是告诉框架与其他task的关系的便捷方法。Prefect使用该信息来构建计算图。在调用flow.run()之前,实际上什么也没有发生。

from prefect import Flow

with Flow('ETL') as flow:
    e = extract()
    t = transform(e)
    l = load(t)

flow.run() # prints "Here's your data: [10, 20, 30]"

如果调用flow.visualize(),Prefect绘出计算图。

命令式flow

我们喜欢Prefect的函数式API,但有些用户可能喜欢更明确的方法。幸运的是,Prefect也可以通过命令式API进行操作。

这是使用命令式API构建的相同流程:

from prefect import Flow

flow = Flow('ETL')
flow.set_dependencies(transform, keyword_tasks=dict(data=extract))
flow.set_dependencies(load, keyword_tasks=dict(data=transform))

flow.run() # prints "Here's your data: [10, 20, 30]"

混搭

Prefect的函数式API和命令式API可以随时使用,即使脚本中的一行使用其他风格也可以使用。唯一的主要区别是函数式API要求代码在活动flow上下文中运行。

PreviousDask部署Next本地调试

Last updated 5 years ago

Was this helpful?

Prefect官网
英版原文
联系译者
ETL