# 本地调试

无论是使用**flow\.run()**&#x5728;本地运行Prefect，还是在使用Prefect Cloud将其投入生产之前对想法方案进行实验，Prefect都可以提供大量用于调试flow和诊断问题的工具。

## 使用FlowRunner进行无状态执行

如果问题与重试有关，或者想按计划运行flow，则可以首先考虑使用**run\_on\_schedule=False**重新运行flow。这可以通过环境变量**PREFECTFLOWSRUN\_ON\_SCHEDULE=false**或关键字**flow\.run(run\_on\_schedule=False)**&#x6765;完成。相反，如果想实现单个flow运行，请考虑直接使用**FlowRunner**：

```python
from prefect.engine.flow_runner import FlowRunner

# ... your flow construction

runner = FlowRunner(flow=my_flow)
flow_state = runner.run(return_tasks=my_flow.tasks)
```

无论其调度计划如何，将立即执行flow。注意默认情况下，FlowRunner不提供任何单独的task状态，因此，如果想要该信息，则必须使用**return\_tasks**关键字参数来请求它。

## 执行器选择

### LocalExecutor

选择执行器对于调试flow的难易程度是非常重要的。DaskExecutor依赖于多进程和多线程，这可能很难定位问题。当试图找出问题所在时，诸如共享状态和打印语句之类的东西会消失在虚空中，可能是一件真正的麻烦事。幸运的是，Prefect提供LocalExecutor，它是最简化的执行器-所有task都将立即执行，并且是在主进程中按照适当的顺序执行。

要换本地执行器，请按照示例进行操作：

```python
from prefect.engine.executors import LocalExecutor

# ... your flow construction

state = flow.run(executor=LocalExecutor()) # <-- the executor needs to be initialized
```

这样就设定了！executor关键字也可以提供给**FlowRunner.run**方法。

### LocalDaskExecutor

如果想升级为功能更强大的执行器，但仍保持易于调试的环境，则建议使用LocalDaskExecutor。该执行器确实使用dask推迟了计算，但是避免了任何并行性，从而使执行流水线更容易推理追踪。初始化此执行器时，可以通过设置**scheduler=threads**或**scheduler=processes**来打开并行性。

> **Prefect设置**
>
> 可以将LocalDaskExecutor设置为本地计算机上的默认执行器。要更改Prefect设置（包括默认执行器），可以：
>
> * 修&#x6539;**\~/.prefect/config.toml**配置文件
> * 更新OS环境变量；可以通过设置PREFECT**SECTION**SUBSECTION**KEY覆盖配置文件中的每个值。例如，要更改默认执行器，可以设置\*\*PREFECT**ENGINE**EXECUTOR**DEFAULT\_CLASS=prefect.engine.executors.LocalExecutor\*\*

### DaskExecutor

最后，如果问题实际上与并行性有关，则需要使用DaskExecutor。有两个初始化关键字参数，在调试时非常有用：

* local\_processes，它是一个布尔值，指定是否要使用多进程。此标志的默认值为False。尝试切换它以查看问题是否与多进程或多线程有关
* debug，这是另一个用于设置**dask.distributed**日志记录级别的布尔值； 默认值是从Prefect配置文件中设置的，并且应该为True，以从dask的日志中获取最详细的输出

## 运行时抛异常

有时不希望Prefect强大的错误处理机制捕获异常，你希望将它们引发，以便可以立即自己调试它们！发生错误时，使用**raise\_on\_exception**上下文管理器抛出异常：

```python
from prefect import Flow, task
from prefect.utilities.debug import raise_on_exception


@task
def div(x):
    return 1 / x

with Flow("My Flow") as f:
    val = div(0)

with raise_on_exception():
    f.run()

---------------------------------------------------------------------------

ZeroDivisionError                         Traceback (most recent call last)

<ipython-input-1-82c40dd24406> in <module>()
     11
     12 with raise_on_exception():
---> 13     f.run()


... # the full traceback is long

<ipython-input-1-82c40dd24406> in div(x)
      5 @task
      6 def div(x):
----> 7     return 1 / x
      8
      9 with Flow("My Flow") as f:


ZeroDivisionError: division by zero
```

现在可以使用自己喜欢的调试器进行跟踪并像往常一样继续进行。

> 提示
>
> 注意该实用工具不需要知道错误发生的位置。

## 事后异常重放

假设想让整个管道运行，并且不想在运行时引发陷阱错误。假设错误被捕获并置task于**Failed**状态，则完整的异常存储在task状态的**result**属性中。知道这一点，就可以在本地重新引发它并从那里进行调试！

展示：

```python
from prefect import Flow, task


@task
def gotcha():
    tup = ('a', ['b'])
    try:
        tup[1] += ['c']
    except TypeError:
        assert len(tup[1]) == 1


flow = Flow(name="tuples", tasks=[gotcha])

state = flow.run()
state.result # {<Task: gotcha>: Failed("Unexpected error: AssertionError()")}

failed_state = state.result[gotcha]
raise failed_state.result

---------------------------------------------------------------------------

TypeError                                 Traceback (most recent call last)

<ipython-input-50-8efcdf8dacda> in gotcha()
      7     try:
----> 8         tup[1] += ['c']
      9     except TypeError:

TypeError: 'tuple' object does not support item assignment

During handling of the above exception, another exception occurred:

AssertionError                            Traceback (most recent call last)
<ipython-input-1-f0f986d2f159> in <module>
     22
     23 failed_state = state.result[gotcha]
---> 24 raise failed_state.result

~/Developer/prefect/src/prefect/engine/runner.py in inner(self, state, *args, **kwargs)
     58
     59         try:
---> 60             new_state = method(self, state, *args, **kwargs)
     61         except ENDRUN as exc:
     62             raise_end_run = True

~/Developer/prefect/src/prefect/engine/task_runner.py in get_task_run_state(self, state, inputs, timeout_handler)
    697             self.logger.info("Running task...")
    698             timeout_handler = timeout_handler or main_thread_timeout
--> 699             result = timeout_handler(self.task.run, timeout=self.task.timeout, **inputs)
    700
    701         # inform user of timeout

~/Developer/prefect/src/prefect/utilities/executors.py in multiprocessing_timeout(fn, timeout, *args, **kwargs)
     68
     69     if timeout is None:
---> 70         return fn(*args, **kwargs)
     71     else:
     72         timeout_length = timeout.total_seconds()

<ipython-input-1-f0f986d2f159> in gotcha()
      9         tup[1] += ['c']
     10     except TypeError:
---> 11         assert len(tup[1]) == 1
     12
     13

AssertionError:

%debug # using the IPython magic method to start a pdb session
```

```bash
ipdb> tup
('a', ['b', 'c'])
ipdb> exit
```

啊哈！这就是在元组中放置可变对象所得到的！

## 调修损坏的flow

到目前为止，已经谈到了如何识别错误，但是如何解决损坏的flow呢？一种方法是从头开始创建flow实例重新运行逻辑。在生产中，对于如何创建flow有一个单一的绝对来源可能是个好主意。但是，当快速迭代或尝试建立概念验证时，这可能会令人沮丧。

尝试更新flow时，有两种方便的flow方法：

* **flow\.get\_tasks**用于检索满足某些条件的task
* **flow\.replace**准备好更新/修复的版本替换task

使用上面的错漏flow，让我们定义一个新task，并使用以下方法将其替换掉：

```python
@task
def fixed():
    tup = ('a', ('b',))
    try:
        tup[1] += ('c',)
    except TypeError:
        assert len(tup[1]) == 1


broken = flow.get_tasks(name="gotcha")[0]
flow.replace(broken, fixed)

flow.run() # Success("All reference tasks succeeded.")
```

如果有一个复杂的依赖关系图，**flow\.replace**可以成为真正的节省时间的工具，用于快速替换task，同时保留所有依赖关系！

> **调用签名**
>
> 注意**flow\.replace**保留原型，这意味着新旧task需要具有完全相同的调用签名。

## 本地检查flow的Docker存储

flow在生产中可能意外中断（或根本无法运行）的另一个原因是它的存储是否损坏（例如，如果忘记了为该flow定义Docker存储时的Python依赖关系）。幸运的是，在本地检查flow的存储很容易！让我们来看一个例子：

```python
from prefect import task, Flow
from prefect.environments.storage import Docker


# import a non-prefect package used for scraping reddit
import praw


@task
def whoami():
    reddit = praw.Reddit(client_id='SI8pN3DSbt0zor',
                         client_secret='xaxkj7HNh8kwg8e5t4m6KvSrbTI',
                         password='1guiwevlfo00esyy',
                         user_agent='testscript by /u/fakebot3',
                         username='fakebot3')
    return reddit.user.me()


storage = Docker(base_image="python:3.6", registry_url="http://my.personal.registry")
flow = Flow("reddit-flow", storage=storage, tasks=[whoami])
```

如果打算将此flow部署到Cloud，将不会再的到它的反馈。为什么？我们来了解一下，首先，在本地构建Docker存储而无需推送注册：

```python
# note that this will require either a valid registry_url, or no registry_url
# push=False is important here; otherwise your local image will be deleted
built_storage = flow.storage.build(push=False)
```

注意Docker输出的倒数第二行中包含的镜像ID：

```bash
Successfully built 0f3b0851148b # your ID will be different
```

现在，在本地计算机上有一个Docker容器，其中包含我们的flow。要访问它，我们首先确定flow的存储位置：

```bash
built_storage.flows
# {"reddit-flow": "/root/.prefect/reddit-flow.prefect"}
```

然后通过以下方式连接到容器：

```bash
# connect to an interactive python session running in the container
docker run -it 0f3b0851148b python
```

最后，我们可以使用cloudpickle将文件反序列化为flow对象：

```python
import cloudpickle

with open("/root/.prefect/reddit-flow.prefect", "rb") as f:
    flow = cloudpickle.load(f)
```

这将导致非常明确的回溯！

```python
Traceback (most recent call last):
    flow = cloudpickle.loads(decrypted_pickle)
  File "/usr/local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 944, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'praw'
```

在这种特殊情况下，我们忘了为Docker存储的python\_dependencies中包含praw；通常，这是确保flow顺利通过部署过程的一种方法。

> **更多知识点**
>
> 实际上，我们发现此过程非常有用，我们已为你自动化了！ Prefect现在在推送Docker镜像之前执行“健康状况检查”，该镜像实际上运行上述代码，并确保flow可在其容器中反序列化。但是，了解这种情况的机制仍然很有用。

* [Prefect官网](https://www.prefect.io/)
* [英版原文](https://docs.prefect.io/core/advanced_tutorials/local-debugging.html)
* [联系译者](https://github.com/listen-lavender)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://listen-lavender.gitbook.io/prefect-docs/advancetutorial/localdebug.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
