Schedule.调度计划
概览
Prefect假设可以出于任何原因随时运行flow实例。但是,在指定时间自动执行flow实例通常很有用。可以通过schedule关键字参数简单的将调度计划附加到flow。对于更详细或更复杂的调度计划,Prefect提供通用的schedule对象,该对象可进行细微的日期时间调整和过滤,以及根据计划的时间更新参数值。
简单调度计划
可以通过schedule关键字参数简单的将调度计划附加到flow:
from prefect import task, Flow
from datetime import timedelta
from prefect.schedules import IntervalSchedule
@task
def say_hello():
print("Hello, world!")
schedule = IntervalSchedule(interval=timedelta(minutes=2))
with Flow("Hello", schedule) as flow:
say_hello()
flow.run()
你可以在此示例中看到cron计划。
复杂调度计划
Prefect调度计划包含三个组件:
发出事件的时钟。例如,一个IntervalClock可能每小时发出一个事件;一个CronClock可以根据cron定时类型字符串发出事件;单个调度计划可能包含多个时钟。时钟还可以用于为每个flow运行实例指定不同的参数值
决定是否应包含事件的过滤器。例如,可能设置了仅允许在工作日或在工作时间内发送事件的过滤器
可通过过滤器修改事件的调整。例如,一个调整可以将事件延迟到下一个工作日或该月的最后一个工作日。
这三个组件允许用户将简单函数组合为复杂行为。
时间间隔时钟
最基本的Prefect时钟是IntervalClock。它采用一个时间间隔参数,并定期发出事件。可以设置可选参数start_date,在这种情况下,时间间隔是相对于该日期的,还可以提供可选参数end_date。
Prefect不支持小于或等于分钟级别的计划。
from datetime import timedelta
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
schedule = Schedule(clocks=[IntervalClock(timedelta(hours=24))])
schedule.next(5)
时区
想要将调度计划固定在某个时区吗?为你的时钟指定一个与该时区相对应的start_date,例如:
schedules.clocks.IntervalClock( start_date=pendulum.datetime(2019, 1, 1, tz="America/New_York"), interval=timedelta(days=1) )
关于夏令时。
夏令时
如果IntervalClock的开始时间带有DST观察时区,则调度计划将进行适当的调整。大于24小时的时间间隔将遵循DST标准,而小于24小时的时间间隔将遵循UTC标准。例如,一个小时的调度计划将在每个UTC小时触发一次,甚至跨越DST边界。当时钟回拨时,这将导致两个运行实例似乎都安排在本地时间凌晨1点进行,即使它们与UTC时间相隔一个小时。对于较长的时间间隔,例如每日调度计划,时间间隔调度计划将针对DST边界进行调整,以使时钟的小时保持不变。这意味着始终在上午9点触发的每日调度计划会遵守DST标准,并会继续在当地时区的上午9点触发。
注意此行为与CronClock不同。
时间点定时时钟
时钟也可以通过用cron定时类型字符串设置Prefect的CronClock来生成:
from datetime import timedelta
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
schedule = Schedule(clocks=[CronClock("0 0 * * *")])
schedule.next(5)
夏令时
如果CronClock的开始时间带有DST观察时区,则调度计划将自行调整。Cron的DST规则基于时钟时间,而不是时间间隔。这意味着每小时cron计划将在每个新的小时的时间点而非每隔一个小时触发。例如,如果将时钟回拨,这将导致两个小时的时间点触发暂停,因为调度计划将在120分钟后的第一次凌晨1点触发和第一次凌晨2点触发。较长的调度计划(例如每天早上9点触发的调度计划)会自动调整DST。
注意此行为与IntervalClock不同。
日期时钟
对于更不同的场景的调度计划,Prefect提供一个DatesClock,仅在用户指定的特定日期触发。
from datetime import timedelta
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import DatesClock
schedule = Schedule(
clocks=[DatesClock([pendulum.now().add(days=1), pendulum.now().add(days=2)])])
schedule.next(2)
调度计划产生Prefect参数(0.9.2以上版本支持)
所有时钟都支持可选参数parameter_defaults,该参数允许用户为由此时钟生成的每个flow运行实例指定不同的Prefect参数。例如,假设我们有以下flow记录传递给它的Parameter的值:
import prefect
from prefect import task, Flow, Parameter
@task
def log_param(p):
logger = prefect.context['logger']
logger.info("Received parameter value {}".format(p))
p = Parameter("p", default=None, required=False)
with Flow("Varying Parameters") as flow:
log_param(p)
每次运行此flow时,我们都可以选择为Prefect参数p传递一个新值,如果要按固定的调度计划运行flow,则可能要根据调用哪个调度计划为p设定不同的值,我们可以通过使用时钟来做到这一点:
import datetime
from prefect.schedules import clocks, Schedule
now = datetime.datetime.utcnow()
clock1 = clocks.IntervalClock(start_date=now,
interval=datetime.timedelta(minutes=1),
parameter_defaults={"p": "CLOCK 1"})
clock2 = clocks.IntervalClock(start_date=now + datetime.timedelta(seconds=30),
interval=datetime.timedelta(minutes=1),
parameter_defaults={"p": "CLOCK 2"})
# the full schedule
schedule = Schedule(clocks=[clock1, clock2])
flow.schedule = schedule # set the schedule on the Flow
flow.run()
当按上述调度计划运行此flow时,每次新的实例运行都会在日志中看到Prefect参数值变化:
...
INFO - prefect.Task: log_param | Received parameter value CLOCK 2
...
INFO - prefect.Task: log_param | Received parameter value CLOCK 1
...
过滤器
Prefect提供了丰富多样的事件过滤器,包括:
on_datetime(允许在特定日期时间发送事件)
on_date(允许在特定日期发送事件,例如3月15日)
at_time(允许在特定时间发送事件,例如下午3:30)
between_datetimes(允许两个特定日期时间之间发送事件)
between_times(允许两个事件之间发送事件,例如上午9点至下午5点)
between_dates(允许两个日历日期之间发送事件,例如1月1日至3月31日)
is_weekday(允许在工作日发送事件)
is_weekend(允许在周末发送事件)
is_month_end(允许在月末发送事件)
Filters can be provided to schedules in three different ways: 可以通过三种不同的方式向计划提供过滤器:
filters:所有过滤器必须返回True才能发送事件
or_filters:至少有一个filter返回True才能发送事件
not_filters:所有过滤器必须返回False才能发送事件
schedules.Schedule(
# fire every hour
clocks=[clocks.IntervalClock(timedelta(hours=1))],
# but only on weekdays
filters=[filters.is_weekday],
# and only at 9am or 3pm
or_filters=[
filters.between_times(pendulum.time(9), pendulum.time(9)),
filters.between_times(pendulum.time(15), pendulum.time(15)),
],
# and not in January
not_filters=[filters.between_dates(1, 1, 1, 31)]
)
调整
通过调整,调度计划可以修改时钟发出的日期,并传入多个过滤器:
添加(为日期添加间隔)
next_weekday(将日期延迟到下一个工作日)
schedules.Schedule(
# fire every day
clocks=[clocks.IntervalClock(timedelta(days=1))],
# filtered for month ends
filters=[filters.is_month_end],
# and run on the next weekday
adjustments=[adjustments.next_weekday]
)
Last updated
Was this helpful?