📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page
  • 斐波那契数列的例子
  • task循环介绍
  • 拼凑完整
  • 进一步

Was this helpful?

  1. 进阶教程

动态DAG和Task循环

Prefect的丰富状态系统允许在运行时动态更改基础DAG结构进行工作流变形,同时仍提供基础工作流所有的原本保证,单个task可以具有自定义重试设置,交换数据,激活通知等。

以前,task映射允许用户在运行时将可并行化的for循环提升为一等公民的并行管道。task循环提供了几乎相同的好处,除了对于需要使用while循环模式的情况。

斐波那契数列的例子

作为一个鼓励例子,让我们实现一个Prefect flow,该流程计算小于给定数M的最大斐波那契数。对于此示例,我们将考虑斐波那契数的计算是一个只能通过API访问的完全黑盒。我们的实现将通过迭代查询下一个数字来进行,直到达到一个大于M的数字为止。将这种模式表达为工作流引出一个独特的问题,因为我们在运行时之前不知道需要多少个task。为了说明,请考虑斐波那契task的以下实现:

import requests
from prefect import task


@task
def compute_large_fibonacci(M):
    n = 1
    fib = 1

    while True:
        next_fib = requests.post(
            "https://nemo.api.stdlib.com/[email protected]/", data={"nth": n}
        ).json()
        if next_fib > M:
            break
        else:
            fib = next_fib
            n += 1

    return fib

假设我们的互联网连接短暂中断并且此task失败。哪个n值引起了故障,如何从n重新启动?为了捕获此类间歇性问题,我们可能考虑为该task提供重试设置,但这将导致整个循环重新运行。如果将每个n值单独都视为具有其自身设置(重试,通知等)的Prefect Task,这是否理想?

task循环介绍

目标是将上述while循环的每个单独迭代提升到其自己的Prefect task,该Prefect task可以自己重试和处理。我们无法提前知道需要多少次迭代。幸运的是,Prefect以一等公民的方式支持这种动态模式。

从一个迭代到下一个迭代需要传递两段信息,循环计数以及循环有效负载(可以在各个迭代之间累积或更改)。为了传达此信息,我们将使用Prefect LOOP信号,如下所示:

import requests
from datetime import timedelta

import prefect
from prefect import task
from prefect.engine.signals import LOOP


@task(max_retries=5, retry_delay=timedelta(seconds=2))
def compute_large_fibonacci(M):
    # we extract the accumulated task loop result from context
    loop_payload = prefect.context.get("task_loop_result", {})

    n = loop_payload.get("n", 1)
    fib = loop_payload.get("fib", 1)

    next_fib = requests.post(
        "https://nemo.api.stdlib.com/[email protected]/", data={"nth": n}
    ).json()

    if next_fib > M:
        return fib  # return statements end the loop

    raise LOOP(message=f"Fib {n}={next_fib}", result=dict(n=n + 1, fib=next_fib))

像所有Prefect信号一样,LOOP信号接受message和result关键字。但是,在这种情况下,结果将包含在task_loop_result键的上下文中,并且可在下一次循环迭代时可用(task_loop_count也可用,但此处不需要该信息)。

拼凑完整

让我们以刚刚学到的内容为基础,将各部分放到一个实际的Prefect flow中。

from prefect import Flow, Parameter

with Flow("fibonacci") as flow:
    M = Parameter("M")
    fib_num = compute_large_fibonacci(M)

作为最佳实践,我们选择将M提升为Prefect参数,而不是对其值进行硬编码。这样,我们可以尝试使用较小的值,并最终在不重新编译Flow的情况下动态设置该值。

借助我们构建的flow,让我们计算小于100的最大斐波纳契数,然后计算1000!

flow_state = flow.run(M=100)
print(flow_state.result[fib_num].result) # 89

flow_state = flow.run(M=1000)
print(flow_state.result[fib_num].result) # 987

如果在本地运行此flow,将看到大量的日志,它们对应于compute_large_fibonacci task的每个迭代,如果这些单个迭代中的任何一个失败,则延迟2秒后task将重试,而成功的迭代无需再次运行!

进一步

Prefect中的循环是一等公民的操作。因此,它可以与映射结合使用,以实现真正的动态工作流!

with Flow("mapped-fibonacci") as mapped_flow:
    ms = Parameter("ms")
    fib_nums = compute_large_fibonacci.map(ms)

需要明确的是,我们编写的flow似乎只有两个task,不知道我们可能提供多少个M值,也不知道每个M都需要多少次迭代!

flow_state = mapped_flow.run(ms=[10, 100, 1000, 1500])
print(flow_state.result[fib_nums].result) # [8, 89, 987, 987]
PreviousPrefect Task剖析Next结果处理器

Last updated 5 years ago

Was this helpful?

Prefect官网
英版原文
联系译者