Flow.流
概览
flow是task的容器。它通过描述task之间的依赖性来表示整个工作流程或应用。
flow是DAG,即“有向无环图”。这是描述某些组织原则的数学方法:
图是一种包含边、顶点、由边连接顶点的数据结构。Prefect将每个flow建模为一个图,其中边建模task依赖关系。
有向图表示两个task连接的边具有起点和终点:一个task先执行,然后再执行另一个task。
有向无环图没有环形依赖:如果遍历该图,将永远不会重复遍历相同的task。
API
函数式API
构建Prefect管道的最便捷方法是使用函数式API。只要进入flow上下文,就可使用函数式API。在这种模式下,你可以把它们当做函数一样,从一个task调用另一个task,并且Prefect将通过适当修改flow在后台建立计算图。
例如:
将子类化task与函数式API结合使用
请注意,为了通过函数式API使用Task子类(与@task装饰器相反),需要在调用该类之前实例化该类:
实例化时设置包括task的retry_delay、trigger和caching机制的属性。使用函数式API,这些属性可以作为参数传递给@task装饰器。
命令式API
Prefect的命令式API可以实现更细粒度的控制。与函数式API相比,它的主要优点是无需传递其结果的条件下允许将task设置为上游或下游依赖。这使得无需创建数据依赖,就可以通过状态依赖创建严格的有序task。
提醒
flow.set_dependencies()和task.set_dependencies()(后者仅在活动的flow上下文中可用)是命令式API的主要入口点。flow还提供一些底层方法,例如.add_task()和.add_edge(),可用于直接操作工作流计算图。
执行flow
通过调用flow.run()执行工作流:
这将返回一个代表运行结果的State对象,包括所有的task状态。
调度计划
Prefect将flow当做函数一样,这意味着它们可以出于任何原因在任何时间设置任何并发度来运行。
但是,flow也可能有调度计划。用Prefect术语说,调度计划无非是一种指定要在特定时间开始新的运行的方式。即使flow有调度计划,仍然可以手动执行它。
更多信息参见调度计划概念文档。
通过调度计划执行flow
如果为附有调度计划的flow调用flow.run(),则将按调度计划运行该flow。注意,它将等待下一个调度时间点开始执行,而不是立即开始运行。
flow.run不支持并发flow的运行
flow.run()是按调度计划运行flow的便捷方法,但是它不支持并发flow的运行。在开始下一次flow运行之前,它将等待flow运行完全结束,包括flow需要重试的task等。但是,Prefect调度计划永远不会返回一个过去的开始时间。这意味着,如果在应该开始运行一个flow的新实例时flow的旧实例仍在运行,则flow的该调度时间点的实例将根本不会发生。如果需要在本地进程中并行运行,请考虑直接使用底层的FlowRunner类API。
关键task
终结者task
flow的终结者task是指没有下游依赖的task,它们是最后运行的task。
在flow的所有终结者task完成之前,不会将其视为Finished,否则将保持表示Running状态。默认情况下,终结者task也是flow的状态关联task,因此可以决定其状态。
执行顺序
Prefect除了上游依赖task状态确定之前不会运行后续的task,不保证flow的task的执行顺序。因此,可能有一个终端task实际上在流程中的其他task之前运行,只要它不依赖于那些task即可。
状态关联task
flow运行时,其状态由其状态关联task的结果状态决定。默认情况下,flow的状态关联task是其终结者task,这包括没有下游task的任何task。如果状态关联task全部成功(包括任何跳过的task),则该flow被视为Success。如果任何状态关联task失败,则将flow视为Failed。无论状态关联task处于什么状态,如果任何task未完成,都将其视为Pending。
什么时候应该自定义状态关联task?
通常,flow的终结者task是默认的合适的状态关联task。但是,有时并非如此。
考虑包括几个task的flow,具有一个仅在主task(业务task)失败时才运行的下游task,用于清理环境释放资源。如果主task失败并且清理task成功,那么flow整体是否算成功?对于某些用户,答案是肯定的:清理操作按预期进行。对于另一些用户,答案是否定的:没有实现flow的主要业务目的。 自定义状态关联task允许你能更改此行为以适合你的场景需求。
序列化
flow元数据可以通过调用flow.serialize()方法进行序列化。
task检索
flow可能包含很多task,要找到所需的确切task可能会很困难。幸运的是,get_tasks方法使这一过程变得更加简单。将task的各种标识符的任何一个传递给该函数,它将检索出任何匹配的task。
状态处理器
状态处理器允许用户提供自定义逻辑,该逻辑在flow更改状态时将触发。例如,如果flow失败,你可以发送Slack通知。我们将其包含在此处,这很有用!
状态处理器必须具有以下签名:
每当flow的状态发生变化时,都会调用该处理程序,并接收flow本身、旧状态和新状态作为参数。状态处理器返回的状态用作flow的新状态。
If multiple handlers are provided, they are called in sequence. Each one will receive the "true" old_state and the new_state generated by the previous handler.
如果提供了多个状态处理器,则会依次调用它们。每个处理器都会收到由上一个处理器生成的true、old_state和new_state。
Last updated