📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page
  • Task日志
  • 丰富的存储handler
  • 先进的控制结构
  • 现成的task库

Was this helpful?

  1. 初级教程

task更多特性

Previous水平扩展Next核心概念

Last updated 5 years ago

Was this helpful?

本节简单介绍Prefect能如何增强工作流行为。但是还有很多的Prefect特性不能覆盖到。

Task日志

教程里面的task通过Python内置print()函数展示信息:

@task
def load_reference_data(ref_data):
    print("saving reference data...")

    db = aclib.Database()
    db.update_reference_data(ref_data)

相应的,task中已经有一个Logger对象可供使用:

@task
def load_reference_data(ref_data):
    logger = prefect.context.get("logger")
    logger.info("saving reference data...")

    db = aclib.Database()
    db.update_reference_data(ref_data)

阅读有关日志的更多信息:

丰富的存储handler

在这个特定的航班数据ETL示例中,我们没有明确的需要在过程的每步中保持中间结果。但是,Prefect提供ResultHandler抽象使用户能够将每个task返回的结果持久化存储到所选的存储中:

from prefect.engine.result_handlers import LocalResultHandler

handler = LocalResultHandler(dir="./my-results")

with Flow("Aircraft-ETL", result_handler=handler) as flow:
    ...

flow.run()

现在,flow执行后,各个task生成的所有结果都将写到./my-results目录中。Prefect提供支持许多的结果处理handler,以将结果持久化存储到常见的存储选项中,例如S3,Google Cloud Storage,Azure Storage等。

阅读有关结果处理handler的更多信息:

先进的控制结构

在本教程中,我们可以更进一步,要求获取多个Y机场的X半径内的所有航班数据。在这种情况下,mapping是解决问题的最简单方法,为每个所需的机场调用extract_live_data task。

阅读有关控制结构的更多信息:

现成的task库

在我们的教程中,task始终是用户定义的函数,但是Prefect提供开箱即用task的库。这是flow使用ShellTask​​s运行任意命令的示例:

from prefect import Flow
from prefect.tasks.shell import ShellTask

shell = ShellTask()
with Flow("Simple Pipeline") as flow:
   flow.chain(
       shell(command='pip install -r requirements.txt'),
       shell(command='black --check .'),
       shell(command='pytest .'),
   )

flow.run()

task库包含与Kubernetes,GitHub,Slack,Docker,AWS,GCP等的集成!

接下来,Prefect还能做什么呢?

使用Prefect内置日志API
定制化日志
结果处理handler概念
task检查
Prefect内置提供的结果处理handler
Mapping
Task Looping
Signaling
Prefect官网
英版原文
联系译者