Prefect实现ETL

在本教程中,我们将使用Prefect来改进前面章节中ETL工作流的整体结构。

跟随终端演示:

教学代码一览

cd examples/tutorial
python 02_etl_flow.py

Extract,Transform,Load

Prefect的最小工作单元是Python函数。因此,我们的首要task是将示例分解成函数。

可能想到的第一个问题是“函数应该有多大/小?”。一种简单的方法是显式地将工作分解为“提取”,“转换”和“加载”函数,如下所示:

def extract(...):
    # fetch all aircraft and reference data
    ...
    return all_data

def transform(all_data):
    # clean the live data
    ...

def load(all_data):
    # save all transformed data and reference data to the database
    ...

这比较函数式,但它仍然不能解决原来的代码中的一些问题:

  • 如果提取实时航班数据失败,已经获取的关联数据怎么办?

  • 如果存储数据库不可用,已经转换的数据怎么办?

这些要点突出了一个事实,即.extract().load()仍是任意范围的。在决定每个功能的大小时,这使我们有了一个经验法则:查看工作流在每个步骤所需的输入和输出数据。在我们的场景,关联数据和实时航班数据来自不同的来源,并分开存储。考虑到这一新见解,让我们进行更多重构:

def extract_reference_data(...):
    # fetch reference data
    ...
    return reference_data

def extract_live_data(...):
    # fetch live data
    ...
    return live_data

def transform(live_data, reference_data):
    # clean the live data
    ...
    return transformed_data

def load_reference_data(reference_data):
    # save reference data to the database
    ...

def load_live_data(transformed_data):
    # save transformed live data to the database
    ...

套用Prefect

现在我们有适当大小的函数,并且知道这些函数之间的关系,可以用Prefect封装我们的工作流。

第1步

使用prefect.task装饰需要Prefect调度执行的任何函数:

from prefect import task, Flow

@task
def extract_reference_data(...):
    # fetch reference data
    ...
    return reference_data

@task
def extract_live_data(...):
    # fetch live data
    ...
    return live_data

@task
def transform(live_data, reference_data):
    # clean the live data
    ...
    return transformed_data

@task
def load_reference_data(reference_data):
    # save reference data to the database
    ...

@task
def load_live_data(transformed_data):
    # save transformed live data to the database
    ...

第2步

在prefect.Flow上下文中指定业务数据和task依赖关系:

# ...task definitions above

with Flow("Aircraft-ETL") as flow:
    reference_data = extract_reference_data()
    live_data = extract_live_data()

    transformed_live_data = transform(live_data, reference_data)

    load_reference_data(reference_data)
    load_live_data(transformed_live_data)

注意:此时没有实际执行任何task,因为使用Flow(...):上下文管理器允许Prefect推理出task之间的依赖关系,并构建稍后将执行的执行图。这个业务场景的执行图如下所示:

与最初的实现相比有了很大的改进!

第3步

执行Flow

# ...flow definition above

flow.run()

此时,以适当的顺序执行Task(我们的Python函数),并按照执行图中的指定从task到task传递业务数据。

Prefect Task库

Prefect提供一个task库,其中包含常见的task实现以及与Kubernetes,GitHub,Slack,Docker,AWS,GCP等的集成!

接下来,让我们对flow进行参数化以使其复用性更好。

Last updated