from prefect.engine.flow_runner import FlowRunner
# ... your flow construction
runner = FlowRunner(flow=my_flow)
flow_state = runner.run(return_tasks=my_flow.tasks)
from prefect.engine.executors import LocalExecutor
# ... your flow construction
state = flow.run(executor=LocalExecutor()) # <-- the executor needs to be initialized
from prefect import Flow, task
from prefect.utilities.debug import raise_on_exception
@task
def div(x):
return 1 / x
with Flow("My Flow") as f:
val = div(0)
with raise_on_exception():
f.run()
---------------------------------------------------------------------------
ZeroDivisionError Traceback (most recent call last)
<ipython-input-1-82c40dd24406> in <module>()
11
12 with raise_on_exception():
---> 13 f.run()
... # the full traceback is long
<ipython-input-1-82c40dd24406> in div(x)
5 @task
6 def div(x):
----> 7 return 1 / x
8
9 with Flow("My Flow") as f:
ZeroDivisionError: division by zero
# note that this will require either a valid registry_url, or no registry_url
# push=False is important here; otherwise your local image will be deleted
built_storage = flow.storage.build(push=False)
注意Docker输出的倒数第二行中包含的镜像ID:
Successfully built 0f3b0851148b # your ID will be different
# connect to an interactive python session running in the container
docker run -it 0f3b0851148b python
最后,我们可以使用cloudpickle将文件反序列化为flow对象:
import cloudpickle
with open("/root/.prefect/reddit-flow.prefect", "rb") as f:
flow = cloudpickle.load(f)
这将导致非常明确的回溯!
Traceback (most recent call last):
flow = cloudpickle.loads(decrypted_pickle)
File "/usr/local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 944, in subimport
__import__(name)
ModuleNotFoundError: No module named 'praw'