📁
prefect.docs
  • 介绍
  • 入门
    • 安装
    • Task和Flow
    • 触发器、关联task和信号
    • 为什么是Prefect
    • 为什么不是Airflow
  • 初级教程
    • ETL介绍
    • Prefect实现ETL
    • 使用Parameters
    • 失败处理
    • 调度执行
    • 水平扩展
    • task更多特性
  • 核心概念
    • Task.任务
    • Flow.流
    • Parameter.参数
    • State.状态
    • Engine.引擎
    • Execution.执行
    • Logging.日志
    • Mapping.映射
    • 通知和状态处理器
    • 持久化缓存
    • 结果对象和结果处理器
    • Schedule.调度计划
    • Secret.秘钥
    • Configuration.配置
    • 最佳实践
    • 常见问题
  • task库
    • Airtable
    • AWS.亚马逊云
    • Azure.微软云
    • Azure ML Service.微软云机器学习
    • Collection.集合
    • Constant.常量
    • Control Flow.控制流
    • Docker
    • Dropbox
    • Email
    • Function.函数
    • GCP.谷歌云
    • GitHub
    • Kubernetes
    • Operators.运算符
    • Postgres
    • Redis
    • RSS
    • Shell
    • Slack
    • Snowflake
    • SQLite
    • Strings.字符串
    • Twitter
  • 进阶教程
    • task映射并行和Prefect参数的高级特性
    • Prefect算子
    • 日志部署
    • Dask部署
    • ETL
    • 本地调试
    • Slack通知
    • Prefect Task剖析
    • 动态DAG和Task循环
    • 结果处理器
    • 工作流可视化
  • 样例
    • 概览
    • Airflow DAG
    • Task缓存
    • 按日收集github状态
    • ETL工作流
    • 工作流状态可视化
    • Docker Pipeline的函数式API
    • Github双周发布周期
    • Docker Pipeline的命令式API
    • 简易Map/Reduce
    • 参数化条件
    • 重试和映射
    • spaCy自然语言处理
    • 状态处理日志
    • Task循环
    • 发数据表至Airtable
  • 开发Issues
    • 内容提要
    • PIN-1:PINs介绍
    • PIN-2:数据处理器和元数据
    • PIN-3:执行环境
    • PIN-4:结果对象
    • PIN-5:组合Tasks
    • PIN-6:删除常量Tasks
    • PIN-7:存储和执行
    • PIN-8:事件驱动工作流
    • PIN-9:Prefect命令行
    • PIN 10:弹性调度计划
    • PIN 11:Task循环
    • PIN 12:环境回调
    • PIN 13:云部署
    • PIN-14:进阶事件驱动工作流
    • PIN-15:丢弃状态和信号
    • PIN-16:结果和目标
  • 开发规范
    • 概览
    • 编码风格
    • 文档注释
    • 测试
    • 贡献代码
    • 版本更新列表
    • 突破
  • 常见疑问
  • 开源社区
  • 代码贡献指南
Powered by GitBook
On this page
  • Prefect方法
  • 简单映射(map)
  • 迭代映射(map)
  • 规约(reduce)
  • 未映射的输入
  • 映射task的状态行为

Was this helpful?

  1. 核心概念

Mapping.映射

Prefect引入了灵活的map/reduce模型,用于动态生成执行并行task。

经典的map/reduce是一个功能强大的两阶段编程模型,可用于在收集和处理所有结果(reduce阶段)之前分配和生成并行化作业(map阶段)。

典型的map/reduce设置需要三件事:

  • 可迭代的输入

  • 一次在单个项目上运行的map功能

  • 一次处理一组项目的reduce功能

例如,我们可以使用map/reduce来获取数字列表,将它们全部加一,然后求和:

numbers = [1, 2, 3]
map_fn = lambda x: x + 1
reduce_fn = lambda x: sum(x)

mapped_result = [map_fn(n) for n in numbers]
reduced_result = reduce_fn(mapped_result)
assert reduced_result == 9

Prefect方法

Prefect的map/reduce版本比经典实现灵活得多。

映射task时,Prefect会为其输入数据的每个元素自动创建task的实例。task实例仅应用于该元素。这意味着映射的task实际上代表许多单个task实例的计算。

如果正常(非映射的)task依赖于映射task,则Prefect会自动应用规约操作以收集映射结果并将其传递给下游task。

但是,如果一个映射task依赖于另一个映射task,则Prefect不会规约上游结果。而是将第n个上游子实例连接到第n个下游子实例,从而创建独立的并行管道。

这是前面的示例改写成Perfect flow:

from prefect import Flow, task

numbers = [1, 2, 3]
map_fn = task(lambda x: x + 1)
reduce_fn = task(lambda x: sum(x))

with Flow('Map Reduce') as flow:
    mapped_result = map_fn.map(numbers)
    reduced_result = reduce_fn(mapped_result)

state = flow.run()
assert state.result[reduced_result].result == 9

动态生成的task实例是一等公民的task

即使用户没有明确创建它们,映射task的task实例也是一等公民的Prefect task。它们可以执行正常task可以执行的任何操作,包括成功,失败,重试,暂停或跳过。

简单映射(map)

最简单的Prefect映射函数接收一个task,并将其应用于其输入的每个元素。

例如,如果我们定义一个将数字加10的task,则可以将该task简单地应用于列表的每个元素:

from prefect import Flow, task

@task
def add_ten(x):
    return x + 10

with Flow('simple map') as flow:
    mapped_result = add_ten.map([1, 2, 3])

运行flow时,mapped_result task的结果将为[11,12,13]。

迭代映射(map)

由于mapped_result只不过是具有可迭代结果的task,因此我们可以立即将其用作另一轮映射的输入:

from prefect import Flow, task

@task
def add_ten(x):
    return x + 10

with Flow('iterated map') as flow:
    mapped_result = add_ten.map([1, 2, 3])
    mapped_result_2 = add_ten.map(mapped_result)

运行此flow时,mapped_result_2 task的结果将为[21、22、23],这是两次应用映射函数的结果。

无需规约(reduce)

即使我们观察到mapping_result的结果是一个列表,除非用户需要,否则Prefect不会应用规约步骤来收集该列表。在此示例中,我们不需要整个列表(我们只需要它的每个元素),因此没有发生规约。这两个映射的task生成了三个完全独立的管道,每个管道包含两个task。

规约(reduce)

如果非映射task需要映射结果,Prefect会自动将映射结果收集到列表中。因此,所有需要规约操作映射task结果的用户都将映射task的结果提供给task!

from prefect import Flow, task

@task
def add_ten(x):
    return x + 10

@task
def sum_numbers(y):
    return sum(y)

with Flow('reduce') as flow:
    mapped_result = add_ten.map([1, 2, 3])
    mapped_result_2 = add_ten.map(mapped_result)
    reduced_result = sum_numbers(mapped_result_2)

在此示例中,sum_numbers从mapped_result接收了自动规约的结果列表。它适时地计算总和:66。

未映射的输入

当task映射到其输入上时,它将保留相同的调用签名和参数,但会在输入上进行迭代以生成其task实例。有时,我们不想迭代其中一个输入,也许它是一个常量值,或者是需要的完整列表。为此,Prefect提供一个方便的unmapped()函数。

from prefect import Flow, task, unmapped

@task
def add(x, y):
    return x + y

with Flow('unmapped inputs') as flow:
    result = add.map(x=[1, 2, 3], y=unmapped(10))

This map will iterate over the x inputs but not over the y input. The result will be [11, 12, 13].

The unmapped function can be applied to any number of input arguments. This means that a mapped task can depend on both mapped and reduced upstream tasks seamlessly.

该映射将遍历输入参数x,但不会遍历输入参数y。结果将是[11,12,13]。

未映射的函数可以应用于任意数量的输入参数。这意味着映射的task可以无缝地依赖于映射的和规约的上游task。

映射task的状态行为

每当映射task被下游task规约时,Prefect都会将映射task子级视为该下游的task的输入。这意味着,除其他外,触发器函数将应用于所有映射的子级,而不是映射的父级。

如果规约task具有all_successful task,但是映射的子级之一失败了,则规约task的触发器将变成失败。这与已手动创建映射的子级代并将其传递给规约task的行为相同。跳过状态也会发生类似的行为。

PreviousLogging.日志Next通知和状态处理器

Last updated 5 years ago

Was this helpful?

Prefect官网
英版原文
联系译者