Flow.流

概览

flow是task的容器。它通过描述task之间的依赖性来表示整个工作流程或应用。

flow是DAG,即“有向无环图”。这是描述某些组织原则的数学方法:

  • 图是一种包含边、顶点、由边连接顶点的数据结构。Prefect将每个flow建模为一个图,其中边建模task依赖关系。

  • 有向图表示两个task连接的边具有起点和终点:一个task先执行,然后再执行另一个task。

  • 有向无环图没有环形依赖:如果遍历该图,将永远不会重复遍历相同的task。

API

函数式API

构建Prefect管道的最便捷方法是使用函数式API。只要进入flow上下文,就可使用函数式API。在这种模式下,你可以把它们当做函数一样,从一个task调用另一个task,并且Prefect将通过适当修改flow在后台建立计算图。

例如:

from prefect import task, Task, Flow
import random

@task
def random_number():
    return random.randint(0, 100)

@task
def plus_one(x):
    return x + 1

with Flow('My Functional Flow') as flow:
    r = random_number()
    y = plus_one(x=r)

将子类化task与函数式API结合使用

请注意,为了通过函数式API使用Task子类(与@task装饰器相反),需要在调用该类之前实例化该类:

class PlusOneTask(Task):
    def run(self, x):
        return x + 1

with Flow('Plus One Flow'):
    task = PlusOneTask() # first create the Task instance
    result = task(10) # then call it with arguments

实例化时设置包括task的retry_delaytriggercaching机制的属性。使用函数式API,这些属性可以作为参数传递给@task装饰器。

命令式API

Prefect的命令式API可以实现更细粒度的控制。与函数式API相比,它的主要优点是无需传递其结果的条件下允许将task设置为上游或下游依赖。这使得无需创建数据依赖,就可以通过状态依赖创建严格的有序task。

from prefect import Task, Flow

class RunMeFirst(Task):
    def run(self):
        print("I'm running first!")

class PlusOneTask(Task):
    def run(self, x):
        return x + 1

flow = Flow('My Imperative Flow')
plus_one = PlusOneTask()
flow.set_dependencies(
    task=plus_one,
    upstream_tasks=[RunMeFirst()],
    keyword_tasks=dict(x=10))

flow.visualize()

提醒

flow.set_dependencies()task.set_dependencies()(后者仅在活动的flow上下文中可用)是命令式API的主要入口点。flow还提供一些底层方法,例如.add_task().add_edge(),可用于直接操作工作流计算图。

执行flow

通过调用flow.run()执行工作流:

from prefect import task, Flow

@task
def say_hello():
    print("Hello, world!")

with Flow("Run Me") as flow:
    h = say_hello()

flow.run() # prints "Hello, world!"

这将返回一个代表运行结果的State对象,包括所有的task状态。

state = flow.run()
state.result[h] # the task state of the say_hello task

调度计划

Prefect将flow当做函数一样,这意味着它们可以出于任何原因在任何时间设置任何并发度来运行。

但是,flow也可能有调度计划。用Prefect术语说,调度计划无非是一种指定要在特定时间开始新的运行的方式。即使flow有调度计划,仍然可以手动执行它。

更多信息参见调度计划概念文档

通过调度计划执行flow

如果为附有调度计划的flow调用flow.run(),则将按调度计划运行该flow。注意,它将等待下一个调度时间点开始执行,而不是立即开始运行。

flow.run不支持并发flow的运行

flow.run()是按调度计划运行flow的便捷方法,但是它不支持并发flow的运行。在开始下一次flow运行之前,它将等待flow运行完全结束,包括flow需要重试的task等。但是,Prefect调度计划永远不会返回一个过去的开始时间。这意味着,如果在应该开始运行一个flow的新实例时flow的旧实例仍在运行,则flow的该调度时间点的实例将根本不会发生。如果需要在本地进程中并行运行,请考虑直接使用底层的FlowRunner类API。

关键task

终结者task

flow的终结者task是指没有下游依赖的task,它们是最后运行的task。

在flow的所有终结者task完成之前,不会将其视为Finished,否则将保持表示Running状态。默认情况下,终结者task也是flow的状态关联task,因此可以决定其状态。

执行顺序

Prefect除了上游依赖task状态确定之前不会运行后续的task,不保证flow的task的执行顺序。因此,可能有一个终端task实际上在流程中的其他task之前运行,只要它不依赖于那些task即可。

状态关联task

flow运行时,其状态由其状态关联task的结果状态决定。默认情况下,flow的状态关联task是其终结者task,这包括没有下游task的任何task。如果状态关联task全部成功(包括任何跳过的task),则该flow被视为Success。如果任何状态关联task失败,则将flow视为Failed。无论状态关联task处于什么状态,如果任何task未完成,都将其视为Pending

with Flow('Reference Task Flow') as flow:
    a, b, c = Task(), Task(), Task()
    flow.add_edge(a, b)
    flow.add_edge(b, c)

# by default, the reference tasks are the terminal tasks
assert flow.reference_tasks() == {c}

什么时候应该自定义状态关联task?

通常,flow的终结者task是默认的合适的状态关联task。但是,有时并非如此。

考虑包括几个task的flow,具有一个仅在主task(业务task)失败时才运行的下游task,用于清理环境释放资源。如果主task失败并且清理task成功,那么flow整体是否算成功?对于某些用户,答案是肯定的:清理操作按预期进行。对于另一些用户,答案是否定的:没有实现flow的主要业务目的。 自定义状态关联task允许你能更改此行为以适合你的场景需求。

序列化

flow元数据可以通过调用flow.serialize()方法进行序列化。

task检索

flow可能包含很多task,要找到所需的确切task可能会很困难。幸运的是,get_tasks方法使这一过程变得更加简单。将task的各种标识符的任何一个传递给该函数,它将检索出任何匹配的task。

# any tasks with the name "my task"
flow.get_tasks(name="my task")

# any tasks with the name "my task" and the "blue" tag
flow.get_tasks(name="my task", tags=["blue"])

# the task with the slug "x"
flow.get_tasks(slug="x")

状态处理器

状态处理器允许用户提供自定义逻辑,该逻辑在flow更改状态时将触发。例如,如果flow失败,你可以发送Slack通知。我们将其包含在此处,这很有用!

状态处理器必须具有以下签名:

state_handler(flow: Flow, old_state: State, new_state: State) -> State

每当flow的状态发生变化时,都会调用该处理程序,并接收flow本身、旧状态和新状态作为参数。状态处理器返回的状态用作flow的新状态。

If multiple handlers are provided, they are called in sequence. Each one will receive the "true" old_state and the new_state generated by the previous handler.

如果提供了多个状态处理器,则会依次调用它们。每个处理器都会收到由上一个处理器生成的trueold_statenew_state

Last updated