def state_handler(obj: Union[Task, Flow], old_state: State, new_state: State) -> Optional[State]:
"""
Any function with this signature can serve as a state handler.
Args:
- obj (Union[Task, Flow]): the underlying object to which this state handler
is attached
- old_state (State): the previous state of this object
- new_state (State): the proposed new state of this object
Returns:
- Optional[State]: the new state of this object (typically this is just `new_state`)
"""
pass
如你所见,状态处理器API是一种在每次状态更改时执行任意Python代码的简单方法。
一个简单的例子将阐明其用法:
from prefect import Task, Flow
def my_state_handler(obj, old_state, new_state):
msg = "\nCalling my custom state handler on {0}:\n{1} to {2}\n"
print(msg.format(obj, old_state, new_state))
return new_state
my_flow = Flow(name="state-handler-demo",
tasks=[Task()],
state_handlers=[my_state_handler])
my_flow.run()
忽略日志,应该输出:
Calling my custom state handler on <Flow: name=state-handler-demo>:
Scheduled() to Running("Running flow.")
Calling my custom state handler on <Flow: name=state-handler-demo>:
Running("Running flow.") to Success("All reference tasks succeeded.")
以完全相同的方式,我们可以将此状态处理器附加到单个task而不是flow:
t = Task(state_handlers=[my_state_handler])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run()
再次忽略日志,这应该输出:
Calling my custom state handler on <Task: Task>:
Pending() to Running("Starting task run.")
Calling my custom state handler on <Task: Task>:
Running("Starting task run.") to Success("Task run succeeded.")
import requests
from prefect import Task, Flow
def post_to_slack(task, old_state, new_state):
if new_state.is_retrying():
msg = "Task {0} failed and is retrying at {1}".format(task, new_state.start_time)
# replace URL with your Slack webhook URL
requests.post("https://XXXXX", json={"text": msg})
return new_state
t = Task(state_handlers=[post_to_slack])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run() # the notifier is never run
from prefect import task, Flow
from prefect.engine import signals
def alert_on_special_failure(task, old_state, new_state):
if new_state.is_failed():
if getattr(new_state.result, "flag", False) is True:
print("Special failure mode! Send all the alerts!")
print("a == b == {}".format(new_state.result.value))
return new_state
@task(state_handlers=[alert_on_special_failure])
def mission_critical_task(a, b):
if a == b:
fail_signal = signals.FAIL("a equaled b!")
fail_signal.flag = True
fail_signal.value = a
raise fail_signal
else:
return 1 / (b - a)
with Flow(name="state-inspection-handler") as flow:
result = mission_critical_task(1, 1)
flow.run()
# Special failure mode! Send all the alerts!
# a == b == 1