import datetime
from prefect.schedules import clocks, Schedule
now = datetime.datetime.utcnow()
clock1 = clocks.IntervalClock(start_date=now,
interval=datetime.timedelta(minutes=1),
parameter_defaults={"p": "CLOCK 1"})
clock2 = clocks.IntervalClock(start_date=now + datetime.timedelta(seconds=30),
interval=datetime.timedelta(minutes=1),
parameter_defaults={"p": "CLOCK 2"})
# the full schedule
schedule = Schedule(clocks=[clock1, clock2])
flow.schedule = schedule # set the schedule on the Flow
flow.run()
当按上述调度计划运行此flow时,每次新的实例运行都会在日志中看到Prefect参数值变化:
...
INFO - prefect.Task: log_param | Received parameter value CLOCK 2
...
INFO - prefect.Task: log_param | Received parameter value CLOCK 1
...
过滤器
Prefect提供了丰富多样的事件过滤器,包括:
on_datetime(允许在特定日期时间发送事件)
on_date(允许在特定日期发送事件,例如3月15日)
at_time(允许在特定时间发送事件,例如下午3:30)
between_datetimes(允许两个特定日期时间之间发送事件)
between_times(允许两个事件之间发送事件,例如上午9点至下午5点)
between_dates(允许两个日历日期之间发送事件,例如1月1日至3月31日)
is_weekday(允许在工作日发送事件)
is_weekend(允许在周末发送事件)
is_month_end(允许在月末发送事件)
Filters can be provided to schedules in three different ways: 可以通过三种不同的方式向计划提供过滤器:
filters:所有过滤器必须返回True才能发送事件
or_filters:至少有一个filter返回True才能发送事件
not_filters:所有过滤器必须返回False才能发送事件
schedules.Schedule(
# fire every hour
clocks=[clocks.IntervalClock(timedelta(hours=1))],
# but only on weekdays
filters=[filters.is_weekday],
# and only at 9am or 3pm
or_filters=[
filters.between_times(pendulum.time(9), pendulum.time(9)),
filters.between_times(pendulum.time(15), pendulum.time(15)),
],
# and not in January
not_filters=[filters.between_dates(1, 1, 1, 31)]
)
调整
通过调整,调度计划可以修改时钟发出的日期,并传入多个过滤器:
添加(为日期添加间隔)
next_weekday(将日期延迟到下一个工作日)
schedules.Schedule(
# fire every day
clocks=[clocks.IntervalClock(timedelta(days=1))],
# filtered for month ends
filters=[filters.is_month_end],
# and run on the next weekday
adjustments=[adjustments.next_weekday]
)