📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page
  • 有状态的task
  • 在Task类的.run()方法持久化数据
  • 序列化
  • task结果和输入
  • 完整的flow

Was this helpful?

  1. 核心概念

常见问题

有状态的task

在Task类的.run()方法持久化数据

task的.run()方法中存储的数据对以后的运行不可用。

尽管这在本地测试期间可能会起作用,但你应该假设每次Prefect task运行时,它都是在全新的环境中进行的。即使task运行两次,它也将无法访问在上一次运行中设置的本地状态。

所以,你不应该这么做:

class BadCounterTask(Task):

    def __init__(self, **kwargs):
        self.counter = 0
        super().__init__(**kwargs)

    def run(self):
        self.counter += 1 # this won't have the intended effect
        return self.counter

序列化

task结果和输入

Prefect中的大多数基础通信模式都要求通信对象可以序列化为某种格式(通常是JSON或二进制)。例如,当task返回有效数据时,结果会使用cloudpickle序列化以与其他Dask工作者进行通信。因此,你应尝试确保创建并提交给Prefect的所有结果和对象都可以适当地序列化。 通常,(反)序列化错误可能是非常难以理解的。

完整的flow

此外,在创建和提交新的flow以进行部署时,应检查以确保可以正确序列化和反序列化你的flow。为了帮助解决此问题,Prefect具有内置的is_serializable实用程序函数,使你可以放心地处理flow:

from prefect.utilities.debug import is_serializable

is_serializable(my_flow) # returns True / False

注意这并不是保证,只是帮助尽早发现问题。

提示

一个好的经验法则是,你依赖的每个对象/函数都应显式附加到flow的某个属性(例如task),或者可以导入Docker打包的某些库。

Previous最佳实践Nexttask库

Last updated 5 years ago

Was this helpful?

更健壮的开发流程是检查:flow可以序列化,部署能本地创建Docker存储,flow可以在容器中反序列化。有关如何执行此操作的详细信息,参见教程中的相应部分。

本地调试
Prefect官网
英版原文
联系译者