Execution.执行
在Prefect引擎中,用户可以通过多种方式来影响执行。
触发器
触发函数根据上游task的状态决定task是否准备就绪。除非触发器通过,否则高级task将无法运行。
默认情况下,task具有all_successful触发器,这意味着除非所有上游task都成功,否则它们将不会运行。通过更改触发函数,可以控制task相对于上游task的行为。其他触发器包括all_failed,any_successful,any_failed,all_finished和manual_only。这些可用于创建仅在前面的task失败时运行、或者一定运行、或者根本不会自动运行的task!
仅当所有上游task都处于Finished状态时,task才会执行其触发器。因此,all_finished触发器与always_run触发器相同。
使用manual_only触发器暂停flow
manual_only始终将task插入Paused状态,因此该flow永远不会自动运行manual_only task。这允许用户在运行中暂停flow。要恢复,请将task插入Resume状态,这将被视为没有上游task的根task,并完全跳过触发器函数的执行检查。
例如,假设我们要用一个根task构造一个flow;如果此task成功,则我们要运行task B。如果失败,我们将要运行task C。我们可以通过使用触发器来完成此模式:
import random
from prefect.triggers import all_successful, all_failed
from prefect import task, Flow
@task(name="Task A")
def task_a():
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
@task(name="Task B", trigger=all_successful)
def task_b():
# do something interesting
pass
@task(name="Task C", trigger=all_failed)
def task_c():
# do something interesting
pass
with Flow("Trigger example") as flow:
success = task_b(upstream_tasks=[task_a])
fail = task_c(upstream_tasks=[task_a])
## note that as written, this flow will fail regardless of the path taken
## because *at least one* terminal task will fail;
## to fix this, we want to set Task B as the "reference task" for the Flow
## so that it's state uniquely determines the overall Flow state
flow.set_reference_tasks([success])
flow.run()
状态信号
Prefect会尽力推断正在运行的task的状态。如果.run()方法成功结束,则Prefect会将状态设置为Success,并记录所有返回的数据。如果.run()方法引发错误,则Prefect会通过相应的消息将状态设置为Failed。
有时,你可能希望对task状态进行更好的控制。例如,你可能希望跳过task或强制重试task。在那种场景下,请发出适当的Prefect信号。它将被拦截并转换为适当的状态。
Prefect为大多数状态提供信号,包括RETRY、SKIP、FAIL、SUCCESS和PAUSE。
from prefect import task
from prefect.engine import signals
def retry_if_negative(x):
if x < 0:
raise signals.RETRY()
else:
return x
Prefect信号的另一种常见用法是,将讨论的task嵌套在其他可能会捕获其正常结果或错误的函数里。在那种情况下,可以使用一个信号来“冒泡”所需的状态变化,并绕过正常的返回机制。
上下文
Prefect提供一个强大的Context对象,在task的.run()方法上不使用显式参数的情况下,可以共享信息。
Prefect会在flow和task执行期间向可以随时访问的上下文填充信息。上下文对象本身可以填充任意的用户定义的键值对,然后可以使用A.key对象方式或类似字典的A[key]索引方式对其进行访问。例如,以下代码假定用户提供上下文{a=1},并且可以使用以下三种方式之一进行访问:
>>> import prefect
>>> prefect.context.a
1
>>> prefect.context['a']
1
>>> prefect.context.get('a')
1
添加全局上下文
可以通过config.toml的[context]部分在全局范围内添加上下文。对于此处指定的任何键,只要Prefect Core不会在内部覆盖此键,就可以从prefect.context全局访问它,甚至可以在flow运行实例之外访问。
# config.toml
[context]
a = 1
>>> import prefect
>>> prefect.context.a
1
运行时修改上下文
修改上下文,甚至全局修改上下文键,在特殊时候能用 Modifying context, even globally set context keys, at specific times is possible using a provided context manager:
使用提供的上下文管理器可以在特定时间修改上下文,甚至可以全局设置上下文密钥:
>>> import prefect
>>> with prefect.context(a=2):
... print(prefect.context.a)
...
2
这对于在不同情况下运行flow以进行快速迭代开发通常很有用:
@task
def try_unlock():
if prefect.context.key == 'abc':
return True
else:
raise signals.FAIL()
with Flow('Using Context') as flow:
try_unlock()
flow.run() # this run fails
with prefect.context(key='abc'):
flow.run() # this run is successful
Prefect提供的上下文
除了设置上下文键参数之外,Prefect还可以在flow实例运行和task实例运行期间动态地将上下文提供给上下文对象。该上下文提供有关当前flow或task的一些标准信息。例如,正在运行的task已经从Prefect提供的上下文中知道了它们的运行日期:
@task
def report_start_day():
logger = prefect.context.get("logger")
logger.info(prefect.context.today)
with Flow('My flow') as flow:
report_start_day()
flow.run()
[2020-03-02 22:15:58,779] INFO - prefect.FlowRunner | Beginning Flow run for 'My flow'
[2020-03-02 22:15:58,780] INFO - prefect.FlowRunner | Starting flow run.
[2020-03-02 22:15:58,786] INFO - prefect.TaskRunner | Task 'report_start_time': Starting task run...
[2020-03-02 22:15:58,786] INFO - prefect.Task: report_start_day | 2020-03-02
[2020-03-02 22:15:58,788] INFO - prefect.TaskRunner | Task 'report_start_time': finished task run for task with final state: 'Success'
[2020-03-02 22:15:58,789] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
使用此上下文对于编写可感知时间的task有帮助,例如使用prefect.context.tomorrow触发与开始时间有关的未来作业的task,或者使用prefect.context.yesterday仅处理前一天的数据。
上下文中还有什么?
有关可以在上下文中找到的值的详尽列表,请参见相应的API文档。
还有些要注意的。
修改Prefect提供的上下文的提醒
由于Prefect内部使用某些上下文在flow和task运行逻辑期间跟踪元数据,因此修改Prefect提供的上下文键值可能会产生意想不到的后果。通常建议避免覆盖API文档中描述的保留键名。
与此相关的一个例外是与时间戳相关的键,例如prefect.context.today。用户可能希望在每个flow运行实例中修改此上下文,以实现信息回填,其中各个flow运行实例在时间序列数据的子集上执行。
Last updated
Was this helpful?