import randomfrom prefect.triggers import all_successful, all_failedfrom prefect import task, Flow@task(name="Task A")deftask_a():if random.random()>0.5:raiseValueError("Non-deterministic error has occured.")@task(name="Task B",trigger=all_successful)deftask_b():# do something interestingpass@task(name="Task C",trigger=all_failed)deftask_c():# do something interestingpasswithFlow("Trigger example")as flow: success =task_b(upstream_tasks=[task_a]) fail =task_c(upstream_tasks=[task_a])## note that as written, this flow will fail regardless of the path taken## because *at least one* terminal task will fail;## to fix this, we want to set Task B as the "reference task" for the Flow## so that it's state uniquely determines the overall Flow stateflow.set_reference_tasks([success])flow.run()
>>> import prefect
>>> with prefect.context(a=2):
... print(prefect.context.a)
...
2
@task
def try_unlock():
if prefect.context.key == 'abc':
return True
else:
raise signals.FAIL()
with Flow('Using Context') as flow:
try_unlock()
flow.run() # this run fails
with prefect.context(key='abc'):
flow.run() # this run is successful
@task
def report_start_day():
logger = prefect.context.get("logger")
logger.info(prefect.context.today)
with Flow('My flow') as flow:
report_start_day()
flow.run()
[2020-03-02 22:15:58,779] INFO - prefect.FlowRunner | Beginning Flow run for 'My flow'
[2020-03-02 22:15:58,780] INFO - prefect.FlowRunner | Starting flow run.
[2020-03-02 22:15:58,786] INFO - prefect.TaskRunner | Task 'report_start_time': Starting task run...
[2020-03-02 22:15:58,786] INFO - prefect.Task: report_start_day | 2020-03-02
[2020-03-02 22:15:58,788] INFO - prefect.TaskRunner | Task 'report_start_time': finished task run for task with final state: 'Success'
[2020-03-02 22:15:58,789] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded