假设我在 Dagster 中有两个实体连接在管道上。第一个实体可以执行某些处理并生成有效输入,以便管道的其余部分执行,或者生成不应进一步处理的无效输入。为了实现此结果,当数据满足无效条件时,我会引发错误,以便管道停止并跳过其余的实体。
提出错误来解决我的用例似乎很棘手,有没有一种方法可以让我跳过管道其余部分的执行而不诉诸异常?
from dagster import solid, pipeline
@solid
def solid_1(context, x: int):
y = x + 1
if y%2 == 0:
raise "No even number is further processed"
return y
@solid
def solid_2(context, y:int):
return y**2
@pipeline
def toy_pipeline():
solid_2(solid_1())
Run Code Online (Sandbox Code Playgroud)
在这个非常人为的示例中,只有当第一个实体的输出为奇数时才应执行实体 2。
在我的实际用例中,第一个实体轮询数据库,有时找不到要处理的数据。在这种情况下,不将执行标记为失败而是标记为成功是有意义的。可以检查每个下游实体中的数据是否满足条件,但这很快就会增加样板文件。当接收数据的实体找不到要处理的数据时,最好有一种方法可以跳过所有下游实体的执行。
为了实现您想要的行为,可以使用is_required=False相应的参数将输出标记为可选OutputDefinition。这意味着输出不一定必须由固体产生。
如果未产生可选输出,则依赖于该输出的所有下游固体将简单地跳过。这对于短路管道(这是您的用例)或更复杂的分支逻辑很有用。当跳过实体时,管道运行不会被标记为失败。
您使用类型提示来定义输入和输出类型,但由于您需要指定参数is_required,因此需要使用显式OuputDefinition.
from dagster import pipeline, solid, RepositoryDefinition, InputDefinition, OutputDefinition, Output
from typing import List
def query_db():
return []
@solid(output_defs=[OutputDefinition(List[int], 'data', is_required=False)])
def solid_1(context):
rows = query_db()
if len(rows) > 0:
yield Output(rows, output_name="data")
@solid
def solid_2(context, data: List[int]):
context.log.info(str(data))
pass
@pipeline
def my_pipeline():
solid_2(solid_1())
Run Code Online (Sandbox Code Playgroud)
实体也可以使用类型提示来solid_2定义。InputDefinition类型提示是以下语法糖InputDefinitions:
@solid(input_defs=[InputDefinition('data', List[int])])
def solid_2(context, data):
context.log.info(str(data))
# Process data
pass
Run Code Online (Sandbox Code Playgroud)
附带说明:一般来说,异常是将实体标记为失败的正确方法,并且在 Dagster 代码中不被视为 hacky。