import randomfrom prefect.triggers import all_successful, all_failedfrom prefect import task, Flow@task(name="Task A")deftask_a():if random.random()>0.5:raiseValueError("Non-deterministic error has occured.")@task(name="Task B", trigger=all_successful)deftask_b():# do something interestingpass@task(name="Task C", trigger=all_failed)deftask_c():# do something interestingpasswithFlow("Trigger example")as flow: success =task_b(upstream_tasks=[task_a]) fail =task_c(upstream_tasks=[task_a])## note that as written, this flow will fail regardless of the path taken## because *at least one* terminal task will fail;## to fix this, we want to set Task B as the "reference task" for the Flow## so that it's state uniquely determines the overall Flow stateflow.set_reference_tasks([success])flow.run()
@taskdeftry_unlock():if prefect.context.key =='abc':returnTrueelse:raise signals.FAIL()withFlow('Using Context')as flow:try_unlock()flow.run()# this run failswith prefect.context(key='abc'): flow.run()# this run is successful
[2020-03-02 22:15:58,779] INFO - prefect.FlowRunner |BeginningFlowrunfor'My flow'[2020-03-02 22:15:58,780] INFO - prefect.FlowRunner |Startingflowrun.[2020-03-02 22:15:58,786] INFO - prefect.TaskRunner |Task'report_start_time':Startingtaskrun...[2020-03-02 22:15:58,786] INFO - prefect.Task: report_start_day |2020-03-02[2020-03-02 22:15:58,788] INFO - prefect.TaskRunner | Task 'report_start_time': finished task run for task with final state: 'Success'
[2020-03-02 22:15:58,789] INFO - prefect.FlowRunner |FlowrunSUCCESS:allreferencetaskssucceeded