from datetime import timedelta
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
schedule = Schedule(clocks=[CronClock("0 0 * * *")])
schedule.next(5)
from datetime import timedelta
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import DatesClock
schedule = Schedule(
clocks=[DatesClock([pendulum.now().add(days=1), pendulum.now().add(days=2)])])
schedule.next(2)
import prefect
from prefect import task, Flow, Parameter
@task
def log_param(p):
logger = prefect.context['logger']
logger.info("Received parameter value {}".format(p))
p = Parameter("p", default=None, required=False)
with Flow("Varying Parameters") as flow:
log_param(p)
import datetime
from prefect.schedules import clocks, Schedule
now = datetime.datetime.utcnow()
clock1 = clocks.IntervalClock(start_date=now,
interval=datetime.timedelta(minutes=1),
parameter_defaults={"p": "CLOCK 1"})
clock2 = clocks.IntervalClock(start_date=now + datetime.timedelta(seconds=30),
interval=datetime.timedelta(minutes=1),
parameter_defaults={"p": "CLOCK 2"})
# the full schedule
schedule = Schedule(clocks=[clock1, clock2])
flow.schedule = schedule # set the schedule on the Flow
flow.run()
...
INFO - prefect.Task: log_param | Received parameter value CLOCK 2
...
INFO - prefect.Task: log_param | Received parameter value CLOCK 1
...
schedules.Schedule(
# fire every hour
clocks=[clocks.IntervalClock(timedelta(hours=1))],
# but only on weekdays
filters=[filters.is_weekday],
# and only at 9am or 3pm
or_filters=[
filters.between_times(pendulum.time(9), pendulum.time(9)),
filters.between_times(pendulum.time(15), pendulum.time(15)),
],
# and not in January
not_filters=[filters.between_dates(1, 1, 1, 31)]
)
schedules.Schedule(
# fire every day
clocks=[clocks.IntervalClock(timedelta(days=1))],
# filtered for month ends
filters=[filters.is_month_end],
# and run on the next weekday
adjustments=[adjustments.next_weekday]
)