def extract(...):
# fetch all aircraft and reference data
...
return all_data
def transform(all_data):
# clean the live data
...
def load(all_data):
# save all transformed data and reference data to the database
...
def extract_reference_data(...):
# fetch reference data
...
return reference_data
def extract_live_data(...):
# fetch live data
...
return live_data
def transform(live_data, reference_data):
# clean the live data
...
return transformed_data
def load_reference_data(reference_data):
# save reference data to the database
...
def load_live_data(transformed_data):
# save transformed live data to the database
...
套用Prefect
现在我们有适当大小的函数,并且知道这些函数之间的关系,可以用Prefect封装我们的工作流。
第1步
使用prefect.task装饰需要Prefect调度执行的任何函数:
from prefect import task, Flow
@task
def extract_reference_data(...):
# fetch reference data
...
return reference_data
@task
def extract_live_data(...):
# fetch live data
...
return live_data
@task
def transform(live_data, reference_data):
# clean the live data
...
return transformed_data
@task
def load_reference_data(reference_data):
# save reference data to the database
...
@task
def load_live_data(transformed_data):
# save transformed live data to the database
...
第2步
在prefect.Flow上下文中指定业务数据和task依赖关系:
# ...task definitions above
with Flow("Aircraft-ETL") as flow:
reference_data = extract_reference_data()
live_data = extract_live_data()
transformed_live_data = transform(live_data, reference_data)
load_reference_data(reference_data)
load_live_data(transformed_live_data)