结果处理器
让我们看一下使用结果处理器。正如概念文档结果和结果处理器中所介绍的那样,Prefect task将其返回值与附加task状态的Result对象相关联,并且可以使用ResultHandler对其进行扩展,启用后,结果处理器可以将结果序列化持久保存到任一存储后端。
为什么首先要使用结果处理器?一种常见的情况是允许在调试流程(包括尤其是生产流程)时检查中间数据。通过在不同的数据转换task期间检查数据,可以按照顺序分析检查点来跟踪运行时数据错误。
设置处理结果
必须调用结果处理器启用检查点。必须在以下两个位置启用:在全局范围为Prefect安装启用,并在task级别上提供结果处理器或flow/task初始化时参数覆盖。根据配置的结果处理器的不同,可能还需要确保正确设置身份验证秘钥。对于Prefect 0.9.1+上的Cloud用户,其中的一些默认情况下会处理。
对于Prefect版本<0.9.1的仅Core用户或Cloud用户,注意:
通过将prefect.config.flows.checkpointing设置为True,全局选择加入Checkpoint
至少为task的一个特定级别(flow级别或task级别)指定结果处理器
对于Prefect 0.9.1+版以上的Cloud用户:
检查点将自动打开;可以通过向它的每个task传递checkpoint=False来关闭禁用它
匹配prefect.config.flows.storage设置的存储后端的结果处理器将自动应用于所有task(如果有);值得注意的是,Docker存储尚不支持此功能
可以在全局级别、flow级别或task级别覆盖自动结果处理器
flow级别设置结果处理器
task级别设置结果处理器
选择结果处理器
在以上示例中,我们仅使用了LocalResultHandler类。这是与不同存储后端集成的几种结果处理器之一。有关各种结果处理器完整列表,请参见prefect.engine.results_handler的API文档,有关该使用接口的更多详细信息,请参见结果和结果处理器文档。
我们可以编写自定义结果处理器,只要它们扩展ResultHandler规范即可;或者我们可以从Prefect Core中选择一个利用合适的存储后端的现有实现;例如,我将使用prefect.engine.results_handler.GCSResultHandler,以便将数据保留在Google Cloud Storage中。
在flow中使用GCSResultHandler
由于必须使用一些初始化参数来实例化GCSResultHandler对象,因此在配置它之后,我们传递flow级别Python对象来覆盖设置:
确保Prefect安装可以通过Google的Cloud API进行身份验证。只要我的Prefect安装主机可以对此GCS空间进行身份验证,每个task的返回值都将在此GCS空间中序列化为自己的文件。
运行完flow后,当我在内存中检查它们时,可以看到task状态,知道它们各自的结果存储在Result.safe_value中的键名:
使用gsutil,我可以看到这些键存在于配置的结果处理器提交数据的目标GCS空间中:
如果使用的是Prefect Cloud,则可以看到来自“safe value”的元数据也已存储并显示在UI中:
使用JSONResultHandler运行flow
JSONResultHandler是一种独特的情况,因为它会将整个Result对象序列化为其Result.safe_value。这仅对少量数据负载和Cloud用户轻松与Cloud数据库共享的数据有用。有效使用它们后,可以有效检查UI中的数据输出。因此,所有Prefect参数类型task都将其用作结果处理器。
让我们看一个例子。数字相加的相同flow将改为使用JSON结果处理器进行配置:
现在,当我运行flow时,Result.safe_value包含其中的task的实际返回值:
而且,在检查task运行详细信息时,此值3对UI中的Cloud用户也是可见的:
Last updated