使用Parameters
跟随终端演示:
cd examples/tutorial python 03_parameterized_etl_flow.py
上一节教程中,我们将Aircraft ETL脚本重构为Prefect flow。但是,extract_live_data task是硬编码,只能提取特定区域内的航班数据,在这种情况下,Dulles国际机场周围的半径为200公里:
@task
def extract_live_data():
# Get the live aircraft vector data around Dulles airport
dulles_airport_position = aclib.Position(lat=38.9519444444, long=-77.4480555556)
area_surrounding_dulles = aclib.bounding_box(dulles_airport_position, radius_km=200)
print("fetching live aircraft data...")
raw_aircraft_data = aclib.fetch_live_aircraft_data(area=area_surrounding_dulles)
return raw_aircraft_data
允许从广泛范围获取数据,而不仅仅是在单个机场附近,这是理想方案。一种方法是允许extract_live_data采用经度和纬度参数。但是,我们可以走得更远:事实证明,我们的关联数据中有已经机场位置信息,可以利用!
让我们重构我们的Python函数以从关联数据获取用户指定的机场:
@task
def extract_live_data(airport, radius, ref_data):
# Get the live aircraft vector data around the given airport (or none)
area = None
if airport:
airport_data = ref_data.airports[airport]
airport_position = aclib.Position(
lat=float(airport_data["latitude"]), long=float(airport_data["longitude"])
)
area = aclib.bounding_box(airport_position, radius)
print("fetching live aircraft data...")
raw_aircraft_data = aclib.fetch_live_aircraft_data(area=area)
return raw_aircraft_data
假如你感到好奇,area=None将不会获取所有已知航班的实时数据,而不管其所在的区域如何。
如何在Prefect flow中控制这些函数参数呢?通过使用prefect.Parameter:
from prefect import Parameter
# ...task definitions...
with Flow("Aircraft-ETL") as flow:
airport = Parameter("airport", default="IAD")
radius = Parameter("radius", default=200)
reference_data = extract_reference_data()
live_data = extract_live_data(airport, radius, reference_data)
transformed_live_data = transform(live_data, reference_data)
load_reference_data(reference_data)
load_live_data(transformed_live_data)
就像task一样,直到调用flow.run()时,才使用默认值(如果提供的话)或传递给.run()的覆盖值设定参数:
# Run the Flow with default airport=IAD & radius=200
flow.run()
# ...default radius and a different airport!
flow.run(airport="DCA")
最后,请注意我们的执行图已改变:现在获取实时数据取决于获取到的关联数据:
接下来,如果一个task失败了会怎样,以及我们如何制定出现问题时采取的措施?
Last updated
Was this helpful?