import random
from prefect.triggers import all_successful, all_failed
from prefect import task, Flow
@task(name="Task A")
def task_a():
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
@task(name="Task B", trigger=all_successful)
def task_b():
# do something interesting
pass
@task(name="Task C", trigger=all_failed)
def task_c():
# do something interesting
pass
with Flow("Trigger example") as flow:
success = task_b(upstream_tasks=[task_a])
fail = task_c(upstream_tasks=[task_a])
## note that as written, this flow will fail regardless of the path taken
## because *at least one* terminal task will fail;
## to fix this, we want to set Task B as the "reference task" for the Flow
## so that it's state uniquely determines the overall Flow state
flow.set_reference_tasks([success])
flow.run()
修改上下文,甚至全局修改上下文键,在特殊时候能用 Modifying context, even globally set context keys, at specific times is possible using a provided context manager:
使用提供的上下文管理器可以在特定时间修改上下文,甚至可以全局设置上下文密钥:
>>> import prefect
>>> with prefect.context(a=2):
... print(prefect.context.a)
...
2
这对于在不同情况下运行flow以进行快速迭代开发通常很有用:
@task
def try_unlock():
if prefect.context.key == 'abc':
return True
else:
raise signals.FAIL()
with Flow('Using Context') as flow:
try_unlock()
flow.run() # this run fails
with prefect.context(key='abc'):
flow.run() # this run is successful
@task
def report_start_day():
logger = prefect.context.get("logger")
logger.info(prefect.context.today)
with Flow('My flow') as flow:
report_start_day()
flow.run()
[2020-03-02 22:15:58,779] INFO - prefect.FlowRunner | Beginning Flow run for 'My flow'
[2020-03-02 22:15:58,780] INFO - prefect.FlowRunner | Starting flow run.
[2020-03-02 22:15:58,786] INFO - prefect.TaskRunner | Task 'report_start_time': Starting task run...
[2020-03-02 22:15:58,786] INFO - prefect.Task: report_start_day | 2020-03-02
[2020-03-02 22:15:58,788] INFO - prefect.TaskRunner | Task 'report_start_time': finished task run for task with final state: 'Success'
[2020-03-02 22:15:58,789] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded