📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page
  • 设置
  • 加一
  • 加两个数字
  • 算术
  • 解析输入

Was this helpful?

  1. 进阶教程

Prefect算子

你的数据工程框架能做这个吗?

Prefect是一种重型数据工作流系统,但它也可以处理轻量级应用程序。

为了说明这一点,让我们构建一个算子。

设置

让我们编写一个轻量函数,使检索计算结果更加容易。我们要做的就是选择终结者task的值。你不需要这样做,但是由于在本教程中我们将要用几次,因此它将使示例更加清楚。

from prefect import task, Flow, Parameter

def run(flow, **parameters):
    state = flow.run(**parameters)
    terminal_task = list(flow.terminal_tasks())[0]
    return state.result[terminal_task].result

加一

有什么事情比对一个数字+1更容易?

with Flow('Add one') as flow:
    result = Parameter('x') + 1

Prefect参数跟其他task一样,除开它们从用户输入中获取值。

测试一下:

assert run(flow, x=1) == 2
assert run(flow, x=2) == 3
assert run(flow, x=-100) == -99

加两个数字

让我们提高一个层次,为什么需要两个输入却只有一个输入?

with Flow('Add x and y') as flow:
    result = Parameter('x') + Parameter('y')

多参数

flow可以具有任意数量的参数,只要它们具有唯一的名称即可。

我们的新算子比较好用:

assert run(flow, x=1, y=1) == 2
assert run(flow, x=40, y=2) == 42

算术

一切都很好,但是让我们给用户一些选择。 我们可以将一个新的op参数与一个开关组合在一起,以使用户可以选择他们想要执行的算子,然后将结果合并为一个单个输出:

from prefect.tasks.control_flow import switch, merge

# note: this will raise some warnings, but they're ok for this use case!
with Flow('Arithmetic') as flow:
    x, y = Parameter('x'), Parameter('y')
    operations = {
        '+': x + y,
        '-': x - y,
        '*': x * y,
        '/': x / y
    }
    switch(condition=Parameter('op'), cases=operations)
    result = merge(*operations.values())

条件分支

Prefect有几种有条件地运行task的方式,包括此处使用的switch和更简单的if/else。

在此实例中,swich将检查op参数的值,然后执行与适当的计算相对应的task。merge函数用于将所有分支合并回单个结果。

现在执行flow,我们提供需要的计算行为:

assert run(flow, x=1, op='+', y=2) == 3
assert run(flow, x=1, op='-', y=2) == -1
assert run(flow, x=1, op='*', y=2) == 2
assert run(flow, x=1, op='/', y=2) == 0.5

解析输入

我们的算术计算器可以工作,但是有点麻烦。让我们编写一个快速的自定义task,以获取一个字符串表达式并将其解析为x,y和op,其余代码与之前相同:

@task
def parse_input(expression):
    x, op, y = expression.split(' ')
    return dict(x=float(x), op=op, y=float(y))

with Flow('Arithmetic') as flow:
    inputs = parse_input(Parameter('expression'))

    # once we have our inputs, everything else is the same:
    x, y = inputs['x'], inputs['y']
    operations = {
        '+': x + y,
        '-': x - y,
        '*': x * y,
        '/': x / y
    }
    switch(condition=inputs['op'], cases=operations)
    result = merge(*operations.values())

@task装饰器

@task装饰器是将函数转换为task的最简单方式。

如何检索task。

索引task

正如我们已经说明可以添加(或减去,或乘以或除以)task一样,也可以为task建立索引。 在这里,我们为输入task的结果建立索引以获取x,y和op。 像其他Prefect操作一样,索引本身也会记录在计算图中,但是将执行推迟到flow运行且索引结果实际可用为止。

现在我们可以在字符串表达式🎉上运行计算器:

assert run(flow, expression='1 + 2') == 3
assert run(flow, expression='1 - 2') == -1
assert run(flow, expression='1 * 2') == 2
assert run(flow, expression='1 / 2') == 0.5

对于更进一步的探索,以下是自动跟踪和生成的计算图Prefect的可视化:

flow.visualize()
Previoustask映射并行和Prefect参数的高级特性Next日志部署

Last updated 5 years ago

Was this helpful?

Prefect官网
英版原文
联系译者
Calculator