如何避免在某些条件下运行 dagster 管道的其余部分

ElB*_*cas 4 dagster

假设我在 Dagster 中有两个实体连接在管道上。第一个实体可以执行某些处理并生成有效输入,以便管道的其余部分执行,或者生成不应进一步处理的无效输入。为了实现此结果,当数据满足无效条件时,我会引发错误,以便管道停止并跳过其余的实体。

提出错误来解决我的用例似乎很棘手,有没有一种方法可以让我跳过管道其余部分的执行而不诉诸异常?

from dagster import solid, pipeline

@solid
def solid_1(context, x: int):
    y = x + 1

    if y%2 == 0:
        raise "No even number is further processed"

    return y

@solid
def solid_2(context, y:int):
    return y**2

@pipeline
def toy_pipeline():
    solid_2(solid_1())
Run Code Online (Sandbox Code Playgroud)

在这个非常人为的示例中,只有当第一个实体的输出为奇数时才应执行实体 2。

在我的实际用例中,第一个实体轮询数据库,有时找不到要处理的数据。在这种情况下,不将执行标记为失败而是标记为成功是有意义的。可以检查每个下游实体中的数据是否满足条件,但这很快就会增加样板文件。当接收数据的实体找不到要处理的数据时,最好有一种方法可以跳过所有下游实体的执行。

car*_*rte 6

为了实现您想要的行为,可以使用is_required=False相应的参数将输出标记为可选OutputDefinition。这意味着输出不一定必须由固体产生。

如果未产生可选输出,则依赖于该输出的所有下游固体将简单地跳过。这对于短路管道(这是您的用例)或更复杂的分支逻辑很有用。当跳过实体时,管道运行不会被标记为失败。

您使用类型提示来定义输入和输出类型,但由于您需要指定参数is_required,因此需要使用显式OuputDefinition.

from dagster import pipeline, solid, RepositoryDefinition, InputDefinition, OutputDefinition, Output
from typing import List

def query_db():
    return []

@solid(output_defs=[OutputDefinition(List[int], 'data', is_required=False)])
def solid_1(context):
    rows = query_db()

    if len(rows) > 0:
        yield Output(rows, output_name="data")


@solid
def solid_2(context, data: List[int]):
    context.log.info(str(data))
    pass


@pipeline
def my_pipeline():
    solid_2(solid_1())
Run Code Online (Sandbox Code Playgroud)

实体也可以使用类型提示来solid_2定义。InputDefinition类型提示是以下语法糖InputDefinitions

@solid(input_defs=[InputDefinition('data', List[int])])
def solid_2(context, data):
    context.log.info(str(data))
    # Process data
    pass
Run Code Online (Sandbox Code Playgroud)

附带说明:一般来说,异常是将实体标记为失败的正确方法,并且在 Dagster 代码中不被视为 hacky。