调度执行

跟随终端演示:

cd examples/tutorial
python 05_schedules.py

现在,航班数据ETL工作流流已经足够值得信赖,我们希望能够按计划持续运行它。Prefect提供了可以附加到flow的Schedule对象,来设置调度计划:

from datetime import timedelta, datetime
from prefect.schedules import IntervalSchedule

# ... task definitions ...

schedule = IntervalSchedule(
    start_date=datetime.utcnow() + timedelta(seconds=1),
    interval=timedelta(minutes=1),
)

with Flow("Aircraft-ETL", schedule=schedule) as flow:
    airport = Parameter("airport", default="IAD")
    radius = Parameter("radius", default=200)

    reference_data = extract_reference_data()
    live_data = extract_live_data(airport, radius, reference_data)

    transformed_live_data = transform(live_data, reference_data)

    load_reference_data(reference_data)
    load_live_data(transformed_live_data)

当调用flow.run()时,我们的工作流将永远不会停止,总是每分钟开始一次新的执行。

Schedules附加信息。

有多种方法可以配置调度计划以满足各种需求。有关Schedules的更多信息,请参见后续文档

接下来,轻松并发执行task。

Last updated