📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page
  • 状态处理器
  • 发送一个简单的通知
  • 响应状态变化
  • 使用多个状态处理器
  • 高级状态处理器API

Was this helpful?

  1. 核心概念

通知和状态处理器

PreviousMapping.映射Next持久化缓存

Last updated 5 years ago

Was this helpful?

告警,通知以及对task状态的动态响应是任何工作流工具的重要特性。使用Prefect原语,用户可以添加Prefect的触发器逻辑使得创建的指定task运行或失败后发送通知。这将起作用,但不能涵盖通知逻辑的更多细微用法(例如,如果task重试,则接收通知)。因此,Prefect引入了一种灵活的概念抽象,称为“状态处理器”,可以附加到各个task或flow。从高层次上讲,状态处理器是在附着对象的每次状态更改时调用的函数;它们可以用于在发生故障时发送警报,成功时发送电子邮件,或者根据新旧状态中包含的信息进行更细微的处理。

除了直接使用state_handlerAPI外,Prefect还提供了更高级的装饰器来实现常见的用例方案,例如故障回调。

Prefect状态

状态处理器与Prefect的“状态”概念抽象密切相关。我们建议先阅读有关的,然后再继续深入下去。

状态处理器

让我们从状态处理器的定义开始:

def state_handler(obj: Union[Task, Flow], old_state: State, new_state: State) -> Optional[State]:
    """
    Any function with this signature can serve as a state handler.

    Args:
        - obj (Union[Task, Flow]): the underlying object to which this state handler
            is attached
        - old_state (State): the previous state of this object
        - new_state (State): the proposed new state of this object

    Returns:
        - Optional[State]: the new state of this object (typically this is just `new_state`)
    """
    pass

如你所见,状态处理器API是一种在每次状态更改时执行任意Python代码的简单方法。

一个简单的例子将阐明其用法:

from prefect import Task, Flow

def my_state_handler(obj, old_state, new_state):
    msg = "\nCalling my custom state handler on {0}:\n{1} to {2}\n"
    print(msg.format(obj, old_state, new_state))
    return new_state

my_flow = Flow(name="state-handler-demo",
               tasks=[Task()],
               state_handlers=[my_state_handler])
my_flow.run()

忽略日志,应该输出:

Calling my custom state handler on <Flow: name=state-handler-demo>:
Scheduled() to Running("Running flow.")

Calling my custom state handler on <Flow: name=state-handler-demo>:
Running("Running flow.") to Success("All reference tasks succeeded.")

以完全相同的方式,我们可以将此状态处理器附加到单个task而不是flow:

t = Task(state_handlers=[my_state_handler])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run()

再次忽略日志,这应该输出:

Calling my custom state handler on <Task: Task>:
Pending() to Running("Starting task run.")

Calling my custom state handler on <Task: Task>:
Running("Starting task run.") to Success("Task run succeeded.")

归根结底,仅此而已!但是,API的简单性掩盖了此特性的许多可能的使用模式,这就是我们接下来要介绍的内容。

注意

为了简单起见,在本文档的其余部分中,我们将重点介绍task状态处理器,但是我们讨论的所有内容均适用于flow。

发送一个简单的通知

如我们上面的例子所示,拦截task状态并对其进行响应非常容易。实际上,我们前面的示例可以认为是一个简单的通知系统,在该系统中,我们将通知打印到stdout。让我们更进一步,编写一个通知程序,每当我们的task完成运行时,该通知程序就会向Slack发布更新。为了使此示例正常工作,需要传入Webhook进行Slack应用设置。如果不使用Slack或没有应用,则无需担心,只需将Slack URL与你可能会访问的任何其他web服务交换即可。

import requests
from prefect import Task, Flow

def post_to_slack(task, old_state, new_state):
    if new_state.is_finished():
        msg = "Task {0} finished in state {1}".format(task, new_state)

        # replace URL with your Slack webhook URL
        requests.post("https://XXXXX", json={"text": msg})

    return new_state


t = Task(state_handlers=[post_to_slack])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run()

在这里,我们通过仅在task状态被视为完成时发送通知来响应状态。这包括Success状态和Failed状态,但不包括诸如Retrying或Scheduled状态。

通知失败会使task变成失败

如果POST请求返回非200状态码,则可能引发错误。很好,但是要警告,Prefect将状态处理器视为task执行的组成部分,因此,如果在调用task的状态处理器时引发错误,则task实例运行将被中止,并且该task将被标记为Failed状态。

关于第三方通知API的认证。

状态处理器可以使用Prefect Secrets

大多数通知系统将需要某种形式的身份认证。不要失望,状态处理器可以像task一样查询使用Prefect Secrets。

响应状态变化

到目前为止,我们的大多数示例都会告知用户task何时会不会进入特定状态。Prefect的state对象包含丰富的数据,使我们可以发挥更大的创造力!让我们重新访问post_to_slack通知程序,让它在task进入Retrying状态时提醒我们,以及等待多久重试发生:

import requests
from prefect import Task, Flow

def post_to_slack(task, old_state, new_state):
    if new_state.is_retrying():
        msg = "Task {0} failed and is retrying at {1}".format(task, new_state.start_time)

        # replace URL with your Slack webhook URL
        requests.post("https://XXXXX", json={"text": msg})

    return new_state


t = Task(state_handlers=[post_to_slack])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run() # the notifier is never run

这例子使用Retrying状态的start_time属性向用户发出更多有用的信息的警告。

创造性思考

因为Prefect允许task返回数据,所以实际上我们可以使状态处理器根据task的输出进行响应。更加有趣的是,任何Prefect状态都可以携带数据,其中包括Failed状态。

让我们实现一个具有特殊失败模式的task。如果发生此故障模式,我们希望立即得到警告。

from prefect import task, Flow
from prefect.engine import signals

def alert_on_special_failure(task, old_state, new_state):
    if new_state.is_failed():
        if getattr(new_state.result, "flag", False) is True:
            print("Special failure mode!  Send all the alerts!")
            print("a == b == {}".format(new_state.result.value))

    return new_state


@task(state_handlers=[alert_on_special_failure])
def mission_critical_task(a, b):
    if a == b:
        fail_signal = signals.FAIL("a equaled b!")
        fail_signal.flag = True
        fail_signal.value = a
        raise fail_signal
    else:
        return 1 / (b - a)


with Flow(name="state-inspection-handler") as flow:
    result = mission_critical_task(1, 1)

flow.run()
# Special failure mode!  Send all the alerts!
# a == b == 1

注意我们能复用这种将数据附加到跨越多个task的FAIL信号,因此,在其他情况也可以复用此状态处理器。

使用多个状态处理器

你可能已经注意到state_handlers参数为复数形式,并可以接受列表。这是因为Prefect允许你根据需要将尽可能多的状态处理器附加到task上!此模式对于组合具有不同用例场景的状态处理器很有用(例如,用于失败的特殊处理器和用于重试的特殊处理器)。如果要在某些紧急情况下向你发出警报,它也很有用,你可以实现许多不同的状态处理器,以确保尽快收到警报。

依次调用状态处理器

如果选择为一个task提供多个状态处理器,请注意将按照提供它们的顺序依次调用它们。

高级状态处理器API

Prefect提供了用于从更小、更模块化的部分创建状态处理器的工具。特别是,位于prefect.utilities.notifications中的callback_factoryhelper的工具函数使你可以从两个简单的函数创建状态处理器,一个实现动作,另一个执行检查(或过滤)以确定何时应执行该动作。让我们使用此工具重新实现post_to_slack重试处理功能:

from prefect.utilities.notifications import callback_factory

def send_post(task, state):
    msg = "Task {0} failed and is retrying at {1}".format(task, state.start_time)
    requests.post("https://XXXXX", json={"text": msg})

post_to_slack = callback_factory(send_post, lambda s: s.is_retrying())

你可以检查该状态处理器的行为是否与我们实现的第一个相同。这种工厂方法允许用户使用简单的API来混合和匹配逻辑部分。

常见检查是状态是否为失败。因此,Prefect提供了高级的API来构造失败回调。特别是给定带签名的函数。

def f(obj: Union[Task, Flow], state: State) -> None

可以在task和flow中使用on_failure关键字参数来自动创建适当的状态处理器。

状态概念文档
Prefect官网
英版原文
联系译者