defextract(...):# fetch all aircraft and reference data...return all_datadeftransform(all_data):# clean the live data...defload(all_data):# save all transformed data and reference data to the database...
def extract_reference_data(...):
# fetch reference data
...
return reference_data
def extract_live_data(...):
# fetch live data
...
return live_data
def transform(live_data, reference_data):
# clean the live data
...
return transformed_data
def load_reference_data(reference_data):
# save reference data to the database
...
def load_live_data(transformed_data):
# save transformed live data to the database
...
from prefect import task, Flow
@task
def extract_reference_data(...):
# fetch reference data
...
return reference_data
@task
def extract_live_data(...):
# fetch live data
...
return live_data
@task
def transform(live_data, reference_data):
# clean the live data
...
return transformed_data
@task
def load_reference_data(reference_data):
# save reference data to the database
...
@task
def load_live_data(transformed_data):
# save transformed live data to the database
...
# ...task definitions above
with Flow("Aircraft-ETL") as flow:
reference_data = extract_reference_data()
live_data = extract_live_data()
transformed_live_data = transform(live_data, reference_data)
load_reference_data(reference_data)
load_live_data(transformed_live_data)