Prefect的丰富状态系统允许在运行时动态更改基础DAG结构进行工作流变形,同时仍提供基础工作流所有的原本保证,单个task可以具有自定义重试设置,交换数据,激活通知等。
以前,task映射允许用户在运行时将可并行化的for循环提升为一等公民的并行管道。task循环提供了几乎相同的好处,除了对于需要使用while循环模式的情况。
斐波那契数列的例子
作为一个鼓励例子,让我们实现一个Prefect flow,该流程计算小于给定数M的最大斐波那契数。对于此示例,我们将考虑斐波那契数的计算是一个只能通过API访问的完全黑盒。我们的实现将通过迭代查询下一个数字来进行,直到达到一个大于M的数字为止。将这种模式表达为工作流引出一个独特的问题,因为我们在运行时之前不知道需要多少个task。为了说明,请考虑斐波那契task的以下实现:
import requests
from prefect import task
@task
def compute_large_fibonacci(M):
n = 1
fib = 1
while True:
next_fib = requests.post(
"https://nemo.api.stdlib.com/[email protected]/", data={"nth": n}
).json()
if next_fib > M:
break
else:
fib = next_fib
n += 1
return fib
假设我们的互联网连接短暂中断并且此task失败。哪个n值引起了故障,如何从n重新启动?为了捕获此类间歇性问题,我们可能考虑为该task提供重试设置,但这将导致整个循环重新运行。如果将每个n值单独都视为具有其自身设置(重试,通知等)的Prefect Task,这是否理想?
task循环介绍
目标是将上述while循环的每个单独迭代提升到其自己的Prefect task,该Prefect task可以自己重试和处理。我们无法提前知道需要多少次迭代。幸运的是,Prefect以一等公民的方式支持这种动态模式。
从一个迭代到下一个迭代需要传递两段信息,循环计数以及循环有效负载(可以在各个迭代之间累积或更改)。为了传达此信息,我们将使用Prefect LOOP信号,如下所示:
import requests
from datetime import timedelta
import prefect
from prefect import task
from prefect.engine.signals import LOOP
@task(max_retries=5, retry_delay=timedelta(seconds=2))
def compute_large_fibonacci(M):
# we extract the accumulated task loop result from context
loop_payload = prefect.context.get("task_loop_result", {})
n = loop_payload.get("n", 1)
fib = loop_payload.get("fib", 1)
next_fib = requests.post(
"https://nemo.api.stdlib.com/[email protected]/", data={"nth": n}
).json()
if next_fib > M:
return fib # return statements end the loop
raise LOOP(message=f"Fib {n}={next_fib}", result=dict(n=n + 1, fib=next_fib))
像所有Prefect信号一样,LOOP信号接受message和result关键字。但是,在这种情况下,结果将包含在task_loop_result键的上下文中,并且可在下一次循环迭代时可用(task_loop_count也可用,但此处不需要该信息)。
拼凑完整
让我们以刚刚学到的内容为基础,将各部分放到一个实际的Prefect flow中。
from prefect import Flow, Parameter
with Flow("fibonacci") as flow:
M = Parameter("M")
fib_num = compute_large_fibonacci(M)
作为最佳实践,我们选择将M提升为Prefect参数,而不是对其值进行硬编码。这样,我们可以尝试使用较小的值,并最终在不重新编译Flow的情况下动态设置该值。
借助我们构建的flow,让我们计算小于100的最大斐波纳契数,然后计算1000!
flow_state = flow.run(M=100)
print(flow_state.result[fib_num].result) # 89
flow_state = flow.run(M=1000)
print(flow_state.result[fib_num].result) # 987
如果在本地运行此flow,将看到大量的日志,它们对应于compute_large_fibonacci task的每个迭代,如果这些单个迭代中的任何一个失败,则延迟2秒后task将重试,而成功的迭代无需再次运行!
进一步
Prefect中的循环是一等公民的操作。因此,它可以与映射结合使用,以实现真正的动态工作流!
with Flow("mapped-fibonacci") as mapped_flow:
ms = Parameter("ms")
fib_nums = compute_large_fibonacci.map(ms)
需要明确的是,我们编写的flow似乎只有两个task,不知道我们可能提供多少个M值,也不知道每个M都需要多少次迭代!
flow_state = mapped_flow.run(ms=[10, 100, 1000, 1500])
print(flow_state.result[fib_nums].result) # [8, 89, 987, 987]