通知和状态处理器
告警,通知以及对task状态的动态响应是任何工作流工具的重要特性。使用Prefect原语,用户可以添加Prefect的触发器逻辑使得创建的指定task运行或失败后发送通知。这将起作用,但不能涵盖通知逻辑的更多细微用法(例如,如果task重试,则接收通知)。因此,Prefect引入了一种灵活的概念抽象,称为“状态处理器”,可以附加到各个task或flow。从高层次上讲,状态处理器是在附着对象的每次状态更改时调用的函数;它们可以用于在发生故障时发送警报,成功时发送电子邮件,或者根据新旧状态中包含的信息进行更细微的处理。
除了直接使用state_handlerAPI外,Prefect还提供了更高级的装饰器来实现常见的用例方案,例如故障回调。
Prefect状态
状态处理器与Prefect的“状态”概念抽象密切相关。我们建议先阅读有关的状态概念文档,然后再继续深入下去。
状态处理器
让我们从状态处理器的定义开始:
def state_handler(obj: Union[Task, Flow], old_state: State, new_state: State) -> Optional[State]:
"""
Any function with this signature can serve as a state handler.
Args:
- obj (Union[Task, Flow]): the underlying object to which this state handler
is attached
- old_state (State): the previous state of this object
- new_state (State): the proposed new state of this object
Returns:
- Optional[State]: the new state of this object (typically this is just `new_state`)
"""
pass
如你所见,状态处理器API是一种在每次状态更改时执行任意Python代码的简单方法。
一个简单的例子将阐明其用法:
from prefect import Task, Flow
def my_state_handler(obj, old_state, new_state):
msg = "\nCalling my custom state handler on {0}:\n{1} to {2}\n"
print(msg.format(obj, old_state, new_state))
return new_state
my_flow = Flow(name="state-handler-demo",
tasks=[Task()],
state_handlers=[my_state_handler])
my_flow.run()
忽略日志,应该输出:
Calling my custom state handler on <Flow: name=state-handler-demo>:
Scheduled() to Running("Running flow.")
Calling my custom state handler on <Flow: name=state-handler-demo>:
Running("Running flow.") to Success("All reference tasks succeeded.")
以完全相同的方式,我们可以将此状态处理器附加到单个task而不是flow:
t = Task(state_handlers=[my_state_handler])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run()
再次忽略日志,这应该输出:
Calling my custom state handler on <Task: Task>:
Pending() to Running("Starting task run.")
Calling my custom state handler on <Task: Task>:
Running("Starting task run.") to Success("Task run succeeded.")
归根结底,仅此而已!但是,API的简单性掩盖了此特性的许多可能的使用模式,这就是我们接下来要介绍的内容。
注意
为了简单起见,在本文档的其余部分中,我们将重点介绍task状态处理器,但是我们讨论的所有内容均适用于flow。
发送一个简单的通知
如我们上面的例子所示,拦截task状态并对其进行响应非常容易。实际上,我们前面的示例可以认为是一个简单的通知系统,在该系统中,我们将通知打印到stdout。让我们更进一步,编写一个通知程序,每当我们的task完成运行时,该通知程序就会向Slack发布更新。为了使此示例正常工作,需要传入Webhook进行Slack应用设置。如果不使用Slack或没有应用,则无需担心,只需将Slack URL与你可能会访问的任何其他web服务交换即可。
import requests
from prefect import Task, Flow
def post_to_slack(task, old_state, new_state):
if new_state.is_finished():
msg = "Task {0} finished in state {1}".format(task, new_state)
# replace URL with your Slack webhook URL
requests.post("https://XXXXX", json={"text": msg})
return new_state
t = Task(state_handlers=[post_to_slack])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run()
在这里,我们通过仅在task状态被视为完成时发送通知来响应状态。这包括Success状态和Failed状态,但不包括诸如Retrying或Scheduled状态。
通知失败会使task变成失败
如果POST请求返回非200状态码,则可能引发错误。很好,但是要警告,Prefect将状态处理器视为task执行的组成部分,因此,如果在调用task的状态处理器时引发错误,则task实例运行将被中止,并且该task将被标记为Failed状态。
关于第三方通知API的认证。
状态处理器可以使用Prefect Secrets
大多数通知系统将需要某种形式的身份认证。不要失望,状态处理器可以像task一样查询使用Prefect Secrets。
响应状态变化
到目前为止,我们的大多数示例都会告知用户task何时会不会进入特定状态。Prefect的state对象包含丰富的数据,使我们可以发挥更大的创造力!让我们重新访问post_to_slack通知程序,让它在task进入Retrying状态时提醒我们,以及等待多久重试发生:
import requests
from prefect import Task, Flow
def post_to_slack(task, old_state, new_state):
if new_state.is_retrying():
msg = "Task {0} failed and is retrying at {1}".format(task, new_state.start_time)
# replace URL with your Slack webhook URL
requests.post("https://XXXXX", json={"text": msg})
return new_state
t = Task(state_handlers=[post_to_slack])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run() # the notifier is never run
这例子使用Retrying状态的start_time属性向用户发出更多有用的信息的警告。
创造性思考
因为Prefect允许task返回数据,所以实际上我们可以使状态处理器根据task的输出进行响应。更加有趣的是,任何Prefect状态都可以携带数据,其中包括Failed状态。
让我们实现一个具有特殊失败模式的task。如果发生此故障模式,我们希望立即得到警告。
from prefect import task, Flow
from prefect.engine import signals
def alert_on_special_failure(task, old_state, new_state):
if new_state.is_failed():
if getattr(new_state.result, "flag", False) is True:
print("Special failure mode! Send all the alerts!")
print("a == b == {}".format(new_state.result.value))
return new_state
@task(state_handlers=[alert_on_special_failure])
def mission_critical_task(a, b):
if a == b:
fail_signal = signals.FAIL("a equaled b!")
fail_signal.flag = True
fail_signal.value = a
raise fail_signal
else:
return 1 / (b - a)
with Flow(name="state-inspection-handler") as flow:
result = mission_critical_task(1, 1)
flow.run()
# Special failure mode! Send all the alerts!
# a == b == 1
注意我们能复用这种将数据附加到跨越多个task的FAIL信号,因此,在其他情况也可以复用此状态处理器。
使用多个状态处理器
你可能已经注意到state_handlers参数为复数形式,并可以接受列表。这是因为Prefect允许你根据需要将尽可能多的状态处理器附加到task上!此模式对于组合具有不同用例场景的状态处理器很有用(例如,用于失败的特殊处理器和用于重试的特殊处理器)。如果要在某些紧急情况下向你发出警报,它也很有用,你可以实现许多不同的状态处理器,以确保尽快收到警报。
依次调用状态处理器
如果选择为一个task提供多个状态处理器,请注意将按照提供它们的顺序依次调用它们。
高级状态处理器API
Prefect提供了用于从更小、更模块化的部分创建状态处理器的工具。特别是,位于prefect.utilities.notifications中的callback_factoryhelper的工具函数使你可以从两个简单的函数创建状态处理器,一个实现动作,另一个执行检查(或过滤)以确定何时应执行该动作。让我们使用此工具重新实现post_to_slack重试处理功能:
from prefect.utilities.notifications import callback_factory
def send_post(task, state):
msg = "Task {0} failed and is retrying at {1}".format(task, state.start_time)
requests.post("https://XXXXX", json={"text": msg})
post_to_slack = callback_factory(send_post, lambda s: s.is_retrying())
你可以检查该状态处理器的行为是否与我们实现的第一个相同。这种工厂方法允许用户使用简单的API来混合和匹配逻辑部分。
常见检查是状态是否为失败。因此,Prefect提供了高级的API来构造失败回调。特别是给定带签名的函数。
def f(obj: Union[Task, Flow], state: State) -> None
可以在task和flow中使用on_failure关键字参数来自动创建适当的状态处理器。
Last updated
Was this helpful?