📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page
  • 概览
  • Flow执行器
  • 参数
  • Task执行器
  • 执行器
  • 使用Dask执行器

Was this helpful?

  1. 核心概念

Engine.引擎

概览

Prefect的执行模型围绕FlowRunner和TaskRunner这两个类构建,它们产生并在State对象上操作。实际执行由Executor类处理,该类可以与外部环境交互。

Flow执行器

flow执行器接收flow并尝试运行其所有task。它收集结果状态,并在可能的情况下返回flow的最终状态。

flow执行器一次遍历所有task。如果task在那一遍之后仍未完成,例如,如果其中一个task需要重试,则将需要第二个循环来尝试完成它们。将所有task(以及flow本身)移至完成状态所需的尝试次数没有限制。

参数

具有参数的flow可能需要参数值(如果这些参数没有默认值)。运行时,必须将参数值传递到flow实例。

Task执行器

task执行器负责执行单个task。它接收task的初始状态以及任何上游状态,并使用这些状态来表示执行管道的结果。例如:

  • task必须处于Pending状态

  • 上游task必须完成

  • task的触发函数必须执行

如果满足这些条件(以及其他一些条件),则task可以进入Running状态。

然后,根据task,它可以是.run()或可以被映射,这涉及创建动态的task实例执行器。

最后,task在后期处理管道中移动,该管道检查是否应重试或缓存该task。

执行器

执行器类负责实际执行的task。例如,flow执行器将把每个task执行器提交给它的执行器,并等待结果。我们建议将Dask分布式执行器作为首选的执行引擎。

执行器具有相对简单的API:用户可以提交函数并等待其结果。

对于测试和开发,首选LocalExecutor。除非明确声明,否则它将在本地进程中同步运行每个函数,并且是flow的默认执行器。

SynchronousExecutor稍微复杂一些。它仍然在单个线程中运行函数,但是使用Dask的调度逻辑。

DaskExecutor是一个完全异步的引擎,可以在分布式Dask集群中运行函数。这是推荐用于生产的引擎。

使用Dask执行器

执行器可以在flow的运行时提供:

from prefect import task, Flow

@task
def say_hello():
    print("Hello, world!")

with Flow("Run Me") as flow:
    h = say_hello()

from prefect.engine.executors import DaskExecutor

executor = DaskExecutor(address="tcp://localhost:8786")
flow.run(executor=executor)

该DaskExecutor将通过地址tcp://localhost:8786连接到Dask调度器,并开始提交要在Dask工作者上执行的工作。

动态调度器

不同的调度器使用时是有些差别的。

LocalDaskExecutor vs DaskExecutor

LocalDaskExecutor和DaskExecutor之间的主要区别在于调度器的选择。可以将LocalDaskExecutor配置为使用任意数量的调度器,而DaskExecutor使用分布式调度器。这意味着LocalDaskExecutor可以帮助实现一些多线程/多进程,但是它没有提供与DaskExecutor一样多的分布式功能。

PreviousState.状态NextExecution.执行

Last updated 5 years ago

Was this helpful?

如果没有为DaskExecutor指定调度器地址,则将创建一个进程内调度器,并在完成时将其清除掉。更多信息参见。

DaskExecutor API文档
Prefect官网
英版原文
联系译者