defstate_handler(obj: Union[Task, Flow],old_state: State,new_state: State) -> Optional[State]:""" Any function with this signature can serve as a state handler. Args: - obj (Union[Task, Flow]): the underlying object to which this state handler is attached - old_state (State): the previous state of this object - new_state (State): the proposed new state of this object Returns: - Optional[State]: the new state of this object (typically this is just `new_state`) """pass
如你所见,状态处理器API是一种在每次状态更改时执行任意Python代码的简单方法。
一个简单的例子将阐明其用法:
from prefect import Task, Flowdefmy_state_handler(obj,old_state,new_state): msg ="\nCalling my custom state handler on {0}:\n{1} to {2}\n"print(msg.format(obj, old_state, new_state))return new_statemy_flow =Flow(name="state-handler-demo", tasks=[Task()], state_handlers=[my_state_handler])my_flow.run()
忽略日志,应该输出:
Callingmycustomstatehandleron<Flow:name=state-handler-demo>:Scheduled() to Running("Running flow.")Callingmycustomstatehandleron<Flow:name=state-handler-demo>:Running("Running flow.") to Success("All reference tasks succeeded.")
以完全相同的方式,我们可以将此状态处理器附加到单个task而不是flow:
t =Task(state_handlers=[my_state_handler])flow =Flow(name="state-handler-demo", tasks=[t])flow.run()
再次忽略日志,这应该输出:
Callingmycustomstatehandleron<Task:Task>:Pending() to Running("Starting task run.")Callingmycustomstatehandleron<Task:Task>:Running("Starting task run.") to Success("Task run succeeded.")
import requestsfrom prefect import Task, Flowdefpost_to_slack(task,old_state,new_state):if new_state.is_retrying(): msg ="Task {0} failed and is retrying at {1}".format(task, new_state.start_time)# replace URL with your Slack webhook URL requests.post("https://XXXXX", json={"text": msg})return new_statet =Task(state_handlers=[post_to_slack])flow =Flow(name="state-handler-demo", tasks=[t])flow.run()# the notifier is never run
from prefect import task, Flowfrom prefect.engine import signalsdefalert_on_special_failure(task,old_state,new_state):if new_state.is_failed():ifgetattr(new_state.result, "flag", False)isTrue:print("Special failure mode! Send all the alerts!")print("a == b == {}".format(new_state.result.value))return new_state@task(state_handlers=[alert_on_special_failure])defmission_critical_task(a,b):if a == b: fail_signal = signals.FAIL("a equaled b!") fail_signal.flag =True fail_signal.value = araise fail_signalelse:return1/ (b - a)withFlow(name="state-inspection-handler")as flow: result =mission_critical_task(1, 1)flow.run()# Special failure mode! Send all the alerts!# a == b == 1
from prefect.utilities.notifications import callback_factorydefsend_post(task,state): msg ="Task {0} failed and is retrying at {1}".format(task, state.start_time) requests.post("https://XXXXX", json={"text": msg})post_to_slack =callback_factory(send_post, lambdas: s.is_retrying())