defextract(...):# fetch all aircraft and reference data ...return all_datadeftransform(all_data):# clean the live data ...defload(all_data):# save all transformed data and reference data to the database ...
defextract_reference_data(...):# fetch reference data ...return reference_datadefextract_live_data(...):# fetch live data ...return live_datadeftransform(live_data,reference_data):# clean the live data ...return transformed_datadefload_reference_data(reference_data):# save reference data to the database ...defload_live_data(transformed_data):# save transformed live data to the database ...
套用Prefect
现在我们有适当大小的函数,并且知道这些函数之间的关系,可以用Prefect封装我们的工作流。
第1步
使用prefect.task装饰需要Prefect调度执行的任何函数:
from prefect import task, Flow@taskdefextract_reference_data(...):# fetch reference data ...return reference_data@taskdefextract_live_data(...):# fetch live data ...return live_data@taskdeftransform(live_data,reference_data):# clean the live data ...return transformed_data@taskdefload_reference_data(reference_data):# save reference data to the database ...@taskdefload_live_data(transformed_data):# save transformed live data to the database ...