📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page
  • 概览
  • 简单调度计划
  • 复杂调度计划
  • 时间间隔时钟
  • 时间点定时时钟
  • 日期时钟
  • 调度计划产生Prefect参数(0.9.2以上版本支持)
  • 过滤器
  • 调整

Was this helpful?

  1. 核心概念

Schedule.调度计划

概览

Prefect假设可以出于任何原因随时运行flow实例。但是,在指定时间自动执行flow实例通常很有用。可以通过schedule关键字参数简单的将调度计划附加到flow。对于更详细或更复杂的调度计划,Prefect提供通用的schedule对象,该对象可进行细微的日期时间调整和过滤,以及根据计划的时间更新参数值。

简单调度计划

可以通过schedule关键字参数简单的将调度计划附加到flow:

from prefect import task, Flow
from datetime import timedelta
from prefect.schedules import IntervalSchedule

@task
def say_hello():
    print("Hello, world!")

schedule = IntervalSchedule(interval=timedelta(minutes=2))

with Flow("Hello", schedule) as flow:
    say_hello()

flow.run()

复杂调度计划

Prefect调度计划包含三个组件:

  • 发出事件的时钟。例如,一个IntervalClock可能每小时发出一个事件;一个CronClock可以根据cron定时类型字符串发出事件;单个调度计划可能包含多个时钟。时钟还可以用于为每个flow运行实例指定不同的参数值

  • 决定是否应包含事件的过滤器。例如,可能设置了仅允许在工作日或在工作时间内发送事件的过滤器

  • 可通过过滤器修改事件的调整。例如,一个调整可以将事件延迟到下一个工作日或该月的最后一个工作日。

这三个组件允许用户将简单函数组合为复杂行为。

时间间隔时钟

最基本的Prefect时钟是IntervalClock。它采用一个时间间隔参数,并定期发出事件。可以设置可选参数start_date,在这种情况下,时间间隔是相对于该日期的,还可以提供可选参数end_date。

Prefect不支持小于或等于分钟级别的计划。

from datetime import timedelta
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock

schedule = Schedule(clocks=[IntervalClock(timedelta(hours=24))])

schedule.next(5)

时区

想要将调度计划固定在某个时区吗?为你的时钟指定一个与该时区相对应的start_date,例如:

schedules.clocks.IntervalClock(
    start_date=pendulum.datetime(2019, 1, 1, tz="America/New_York"),
    interval=timedelta(days=1)
)

关于夏令时。

夏令时

如果IntervalClock的开始时间带有DST观察时区,则调度计划将进行适当的调整。大于24小时的时间间隔将遵循DST标准,而小于24小时的时间间隔将遵循UTC标准。例如,一个小时的调度计划将在每个UTC小时触发一次,甚至跨越DST边界。当时钟回拨时,这将导致两个运行实例似乎都安排在本地时间凌晨1点进行,即使它们与UTC时间相隔一个小时。对于较长的时间间隔,例如每日调度计划,时间间隔调度计划将针对DST边界进行调整,以使时钟的小时保持不变。这意味着始终在上午9点触发的每日调度计划会遵守DST标准,并会继续在当地时区的上午9点触发。

注意此行为与CronClock不同。

时间点定时时钟

时钟也可以通过用cron定时类型字符串设置Prefect的CronClock来生成:

from datetime import timedelta
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock

schedule = Schedule(clocks=[CronClock("0 0 * * *")])

schedule.next(5)

夏令时

如果CronClock的开始时间带有DST观察时区,则调度计划将自行调整。Cron的DST规则基于时钟时间,而不是时间间隔。这意味着每小时cron计划将在每个新的小时的时间点而非每隔一个小时触发。例如,如果将时钟回拨,这将导致两个小时的时间点触发暂停,因为调度计划将在120分钟后的第一次凌晨1点触发和第一次凌晨2点触发。较长的调度计划(例如每天早上9点触发的调度计划)会自动调整DST。

注意此行为与IntervalClock不同。

日期时钟

对于更不同的场景的调度计划,Prefect提供一个DatesClock,仅在用户指定的特定日期触发。

from datetime import timedelta
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import DatesClock

schedule = Schedule(
    clocks=[DatesClock([pendulum.now().add(days=1), pendulum.now().add(days=2)])])

schedule.next(2)

调度计划产生Prefect参数(0.9.2以上版本支持)

所有时钟都支持可选参数parameter_defaults,该参数允许用户为由此时钟生成的每个flow运行实例指定不同的Prefect参数。例如,假设我们有以下flow记录传递给它的Parameter的值:

import prefect
from prefect import task, Flow, Parameter

@task
def log_param(p):
    logger = prefect.context['logger']
    logger.info("Received parameter value {}".format(p))

p = Parameter("p", default=None, required=False)

with Flow("Varying Parameters") as flow:
    log_param(p)

每次运行此flow时,我们都可以选择为Prefect参数p传递一个新值,如果要按固定的调度计划运行flow,则可能要根据调用哪个调度计划为p设定不同的值,我们可以通过使用时钟来做到这一点:

import datetime
from prefect.schedules import clocks, Schedule

now = datetime.datetime.utcnow()

clock1   = clocks.IntervalClock(start_date=now, 
                                interval=datetime.timedelta(minutes=1), 
                                parameter_defaults={"p": "CLOCK 1"})
clock2   = clocks.IntervalClock(start_date=now + datetime.timedelta(seconds=30), 
                                interval=datetime.timedelta(minutes=1), 
                                parameter_defaults={"p": "CLOCK 2"})

# the full schedule
schedule = Schedule(clocks=[clock1, clock2])

flow.schedule = schedule # set the schedule on the Flow
flow.run()

当按上述调度计划运行此flow时,每次新的实例运行都会在日志中看到Prefect参数值变化:

...
INFO - prefect.Task: log_param | Received parameter value CLOCK 2
...
INFO - prefect.Task: log_param | Received parameter value CLOCK 1
...

过滤器

Prefect提供了丰富多样的事件过滤器,包括:

  • on_datetime(允许在特定日期时间发送事件)

  • on_date(允许在特定日期发送事件,例如3月15日)

  • at_time(允许在特定时间发送事件,例如下午3:30)

  • between_datetimes(允许两个特定日期时间之间发送事件)

  • between_times(允许两个事件之间发送事件,例如上午9点至下午5点)

  • between_dates(允许两个日历日期之间发送事件,例如1月1日至3月31日)

  • is_weekday(允许在工作日发送事件)

  • is_weekend(允许在周末发送事件)

  • is_month_end(允许在月末发送事件)

Filters can be provided to schedules in three different ways: 可以通过三种不同的方式向计划提供过滤器:

  • filters:所有过滤器必须返回True才能发送事件

  • or_filters:至少有一个filter返回True才能发送事件

  • not_filters:所有过滤器必须返回False才能发送事件

schedules.Schedule(
    # fire every hour
    clocks=[clocks.IntervalClock(timedelta(hours=1))],
    # but only on weekdays
    filters=[filters.is_weekday],
    # and only at 9am or 3pm
    or_filters=[
        filters.between_times(pendulum.time(9), pendulum.time(9)),
        filters.between_times(pendulum.time(15), pendulum.time(15)),
    ],
    # and not in January
    not_filters=[filters.between_dates(1, 1, 1, 31)]
)

调整

通过调整,调度计划可以修改时钟发出的日期,并传入多个过滤器:

  • 添加(为日期添加间隔)

  • next_weekday(将日期延迟到下一个工作日)

schedules.Schedule(
    # fire every day
    clocks=[clocks.IntervalClock(timedelta(days=1))],

    # filtered for month ends
    filters=[filters.is_month_end],

    # and run on the next weekday
    adjustments=[adjustments.next_weekday]
    )
Previous结果对象和结果处理器NextSecret.秘钥

Last updated 5 years ago

Was this helpful?

你可以在此中看到cron计划。

示例
Prefect官网
英版原文
联系译者