📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page
  • 缓存输入
  • 缓存输出
  • 检查点

Was this helpful?

  1. 核心概念

持久化缓存

开箱即用,Prefect Core不会永久保存数据。所有数据,结果和缓存状态都存储在运行该flow的Python进程的内存中。但是,Prefect Core提供了所有必要的钩子,用于在外部位置持久/查询数据。如果你需要开箱即用的持久层,则可以考虑使用Prefect Cloud。

Prefect提供了几种处理缓存数据的方法。在任何可能的情况下,缓存都将自动进行,或以最少的用户输入触发。

  • 缓存输入(Input Caching)

  • 缓存输出(Output Caching)

  • 检查点(Checkpointing)

缓存输入

在运行Prefect flow时,通常有一些需要在将来重新运行的task。例如,当task失败并需要重试时,或者task具有manual_only触发器时,可能会发生这种情况。

每当Prefect检测到将来需要运行某个task时,它将自动缓存该task需要运行的所有信息,并将其存储在结果状态中。下次Prefect遇到该task时,关键信息将反序列化并用于运行task。

自动缓存

缓存输入是自动缓存。Prefect会在必要时自动应用它。

缓存输出

有时,最好缓存task的输出,以避免将来重新计算它。这种模式的常见示例包括不太可能更改的昂贵或费时的计算。在这种情况下,用户可以指定应该将task缓存一定的持续时间,或者只要满足某些条件即可。

这种机制有时称为“时间旅行”,因为它使一个flow运行中计算出的结果可用于其他运行。

缓存输出由三个task参数控制:cache_for,cache_validator和cache_key。

  • cache_for:一个时间长度,表示应将输出缓存多长时间

  • cache_validator:一个可调用对象,表示缓存应如何过期。默认值为duration_only,这意味着缓存将在cache_for的持续时间内处于有效状态。其他验证器可以在prefect.engine.cache_validators中找到,并且包括用于在task接收到不同的输入或flow以不同的参数运行时失效缓存的机制

  • cache_key:用于存储输出缓存的可选参数;指定此参数将允许不同的task以及不同的flow共享一个公共缓存

# this task will be cached for 1 hour
task_1 = prefect.Task(
    cache_for=datetime.timedelta(hours=1))

# this task will be cached for 1 hour, but only if the flow is run with the same parameters
task_2 = prefect.Task(
    cache_for=datetime.timedelta(hours=1),
    cache_validator=prefect.engine.cache_validators.all_parameters)

缓存存储在上下文中

注意,在本地运行Prefect Core时,task的缓存状态将存储在内存的prefect.context的对象中。

检查点

通常,将task的数据保存在外部存储很有用。你总是可以将此逻辑直接直接写入task本身,但这有时会使测试变得困难。Prefect提供了task“检查点”的概念,以确保每次成功运行task时都会调用其结果处理器。要配置用于检查点的task,就得提供结果处理器,并在task初始化时设置checkpoint=True:

from prefect.engine.result_handlers import LocalResultHandler
from prefect import task, Task


class MyTask(Task):
    def run(self):
        return 42


# create a task via initializing our custom Task class
class_task = MyTask(
    checkpoint=True, result_handler=LocalResultHandler(dir="~/.prefect")
)


# create a task via the task decorator
@task(checkpoint=True, result_handler=LocalResultHandler(dir="~/.prefect"))
def func_task():
    return 99
Previous通知和状态处理器Next结果对象和结果处理器

Last updated 5 years ago

Was this helpful?

Prefect Core中的默认设置是关闭检查点,Prefect Cloud 0.9.1+中的默认设置是打开检查点。有关更多信息,阅读有关的概念文档以及有关的设置教程。

结果和结果处理器
使用结果处理器
Prefect官网
英版原文
联系译者