# 通知和状态处理器

告警，通知以及对task状态的动态响应是任何工作流工具的重要特性。使用Prefect原语，用户可以添加Prefect的触发器逻辑使得创建的指定task运行或失败后发送通知。这将起作用，但不能涵盖通知逻辑的更多细微用法（例如，如果task重试，则接收通知）。因此，Prefect引入了一种灵活的概念抽象，称为“状态处理器”，可以附加到各个task或flow。从高层次上讲，状态处理器是在附着对象的每次状态更改时调用的函数；它们可以用于在发生故障时发送警报，成功时发送电子邮件，或者根据新旧状态中包含的信息进行更细微的处理。

除了直接使用**state\_handler**API外，Prefect还提供了更高级的装饰器来实现常见的用例方案，例如故障回调。

> **Prefect状态**
>
> 状态处理器与Prefect的“状态”概念抽象密切相关。我们建议先阅读有关的[状态概念文档](https://docs.prefect.io/core/concepts/states.html)，然后再继续深入下去。

## 状态处理器

让我们从状态处理器的定义开始：

```python
def state_handler(obj: Union[Task, Flow], old_state: State, new_state: State) -> Optional[State]:
    """
    Any function with this signature can serve as a state handler.

    Args:
        - obj (Union[Task, Flow]): the underlying object to which this state handler
            is attached
        - old_state (State): the previous state of this object
        - new_state (State): the proposed new state of this object

    Returns:
        - Optional[State]: the new state of this object (typically this is just `new_state`)
    """
    pass
```

如你所见，状态处理器API是一种在每次状态更改时执行任意Python代码的简单方法。

一个简单的例子将阐明其用法：

```python
from prefect import Task, Flow

def my_state_handler(obj, old_state, new_state):
    msg = "\nCalling my custom state handler on {0}:\n{1} to {2}\n"
    print(msg.format(obj, old_state, new_state))
    return new_state

my_flow = Flow(name="state-handler-demo",
               tasks=[Task()],
               state_handlers=[my_state_handler])
my_flow.run()
```

忽略日志，应该输出：

```bash
Calling my custom state handler on <Flow: name=state-handler-demo>:
Scheduled() to Running("Running flow.")

Calling my custom state handler on <Flow: name=state-handler-demo>:
Running("Running flow.") to Success("All reference tasks succeeded.")
```

以完全相同的方式，我们可以将此状态处理器附加到单个task而不是flow：

```python
t = Task(state_handlers=[my_state_handler])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run()
```

再次忽略日志，这应该输出：

```bash
Calling my custom state handler on <Task: Task>:
Pending() to Running("Starting task run.")

Calling my custom state handler on <Task: Task>:
Running("Starting task run.") to Success("Task run succeeded.")
```

归根结底，仅此而已！但是，API的简单性掩盖了此特性的许多可能的使用模式，这就是我们接下来要介绍的内容。

> **注意**
>
> 为了简单起见，在本文档的其余部分中，我们将重点介绍task状态处理器，但是我们讨论的所有内容均适用于flow。

## 发送一个简单的通知

如我们上面的例子所示，拦截task状态并对其进行响应非常容易。实际上，我们前面的示例可以认为是一个简单的通知系统，在该系统中，我们将通知打印到stdout。让我们更进一步，编写一个通知程序，每当我们的task完成运行时，该通知程序就会向Slack发布更新。为了使此示例正常工作，需要传入Webhook进行Slack应用设置。如果不使用Slack或没有应用，则无需担心，只需将Slack URL与你可能会访问的任何其他web服务交换即可。

```python
import requests
from prefect import Task, Flow

def post_to_slack(task, old_state, new_state):
    if new_state.is_finished():
        msg = "Task {0} finished in state {1}".format(task, new_state)

        # replace URL with your Slack webhook URL
        requests.post("https://XXXXX", json={"text": msg})

    return new_state


t = Task(state_handlers=[post_to_slack])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run()
```

在这里，我们通过仅在task状态被视为完成时发送通知来响应状态。这包括**Success**状态和**Failed**状态，但不包括诸如**Retrying**或**Scheduled**状态。

> **通知失败会使task变成失败**
>
> 如果POST请求返回非200状态码，则可能引发错误。很好，但是要警告，Prefect将状态处理器视为task执行的组成部分，因此，如果在调用task的状态处理器时引发错误，则task实例运行将被中止，并且该task将被标记为**Failed**状态。

关于第三方通知API的认证。

> **状态处理器可以使用Prefect Secrets**
>
> 大多数通知系统将需要某种形式的身份认证。不要失望，状态处理器可以像task一样查询使用Prefect Secrets。

## **响应状态变化**

到目前为止，我们的大多数示例都会告知用户task何时会不会进入特定状态。Prefect的state对象包含丰富的数据，使我们可以发挥更大的创造力！让我们重新访问**post\_to\_slack**通知程序，让它在task进入**Retrying**状态时提醒我们，以及等待多久重试发生：

```python
import requests
from prefect import Task, Flow

def post_to_slack(task, old_state, new_state):
    if new_state.is_retrying():
        msg = "Task {0} failed and is retrying at {1}".format(task, new_state.start_time)

        # replace URL with your Slack webhook URL
        requests.post("https://XXXXX", json={"text": msg})

    return new_state


t = Task(state_handlers=[post_to_slack])
flow = Flow(name="state-handler-demo", tasks=[t])
flow.run() # the notifier is never run
```

这例子使用**Retrying**状态的start\_time属性向用户发出更多有用的信息的警告。

> **创造性思考**
>
> 因为Prefect允许task返回数据，所以实际上我们可以使状态处理器根据task的输出进行响应。更加有趣的是，任何Prefect状态都可以携带数据，其中包括**Failed**状态。

让我们实现一个具有特殊失败模式的task。如果发生此故障模式，我们希望立即得到警告。

```python
from prefect import task, Flow
from prefect.engine import signals

def alert_on_special_failure(task, old_state, new_state):
    if new_state.is_failed():
        if getattr(new_state.result, "flag", False) is True:
            print("Special failure mode!  Send all the alerts!")
            print("a == b == {}".format(new_state.result.value))

    return new_state


@task(state_handlers=[alert_on_special_failure])
def mission_critical_task(a, b):
    if a == b:
        fail_signal = signals.FAIL("a equaled b!")
        fail_signal.flag = True
        fail_signal.value = a
        raise fail_signal
    else:
        return 1 / (b - a)


with Flow(name="state-inspection-handler") as flow:
    result = mission_critical_task(1, 1)

flow.run()
# Special failure mode!  Send all the alerts!
# a == b == 1
```

注意我们能复用这种将数据附加到跨越多个task的**FAIL**信号，因此，在其他情况也可以复用此状态处理器。

## 使用多个状态处理器

你可能已经注意到**state\_handlers**参数为复数形式，并可以接受列表。这是因为Prefect允许你根据需要将尽可能多的状态处理器附加到task上！此模式对于组合具有不同用例场景的状态处理器很有用（例如，用于失败的特殊处理器和用于重试的特殊处理器）。如果要在某些紧急情况下向你发出警报，它也很有用，你可以实现许多不同的状态处理器，以确保尽快收到警报。

> **依次调用状态处理器**
>
> 如果选择为一个task提供多个状态处理器，请注意将按照提供它们的顺序依次调用它们。

## 高级状态处理器API

Prefect提供了用于从更小、更模块化的部分创建状态处理器的工具。特别是，位于**prefect.utilities.notifications**中的**callback\_factory**helper的工具函数使你可以从两个简单的函数创建状态处理器，一个实现动作，另一个执行检查（或过滤）以确定何时应执行该动作。让我们使用此工具重新实现**post\_to\_slack**重试处理功能：

```python
from prefect.utilities.notifications import callback_factory

def send_post(task, state):
    msg = "Task {0} failed and is retrying at {1}".format(task, state.start_time)
    requests.post("https://XXXXX", json={"text": msg})

post_to_slack = callback_factory(send_post, lambda s: s.is_retrying())
```

你可以检查该状态处理器的行为是否与我们实现的第一个相同。这种工厂方法允许用户使用简单的API来混合和匹配逻辑部分。

常见检查是状态是否为失败。因此，Prefect提供了高级的API来构造失败回调。特别是给定带签名的函数。

```python
def f(obj: Union[Task, Flow], state: State) -> None
```

可以在task和flow中使用**on\_failure**关键字参数来自动创建适当的状态处理器。

* [Prefect官网](https://www.prefect.io/)
* [英版原文](https://docs.prefect.io/core/concepts/notifications.html)
* [联系译者](https://github.com/listen-lavender)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://listen-lavender.gitbook.io/prefect-docs/coreconcepts/notificationstatehandler.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
