Dagster 循环实体的输出和并发处理

cya*_*yau 5 python concurrency pipeline dagster

我有一个由两个固体组成的 Dagster 管道(下面是可重现的示例)。第一个 ( return_some_list) 输出一些对象的列表。第二个实体 ( print_num) 接受第一个列表(不是完整列表)中的元素,并对该元素进行一些处理。

我该如何为第一个实体返回的列表中的每个元素调用第二个实体?还请解释任何最佳实践。

不确定这是否是最好的方法(让我知道),但我想print_num为第一个实体输出的每个元素生成一个不同的实体实例。这将帮助我将来并行化实体并更好地处理长/计算密集型实体。

from dagster import execute_pipeline, pipeline, solid

@solid
def return_some_list(context):
    return [1,2,3,4,5]

@solid
def print_num(context, some_num: int):
    print(some_num)
    return some_num


@pipeline
def some_pipeline():
    output_list = return_some_list()
    for some_num in output_list:
        print_num(some_num)

if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)
Run Code Online (Sandbox Code Playgroud)

cya*_*yau 6

事实证明,有一个实验性功能(希望将成为正式的),允许基于可迭代输出的元素创建任务。工作代码如下:

from dagster import execute_pipeline, pipeline, solid, Output, OutputDefinition
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from typing import List


@solid
def return_some_list(context):
    return [1, 2, 3, 4, 5]


@solid(output_defs=[DynamicOutputDefinition(int)])
def generate_subtasks(context, nums: List[int]):
    context.log.info(str(nums))
    for num in nums:
        yield DynamicOutput(num, mapping_key=f'subtask_{num}')


@solid
def print_num(context, some_num: int):
    context.log.info(str(some_num))
    return some_num


@pipeline
def some_pipeline():
    output_list = return_some_list()
    generate_subtasks(output_list).map(print_num)


if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)
Run Code Online (Sandbox Code Playgroud)

在这里,return_some_list返回一个可迭代的。我们希望为这个可迭代的每个元素运行一个实体。我们在solid中执行此操作generate_subtasks,它会生成一个DynamicOutput带有元素的元素以及为其生成的子任务的名称。的类型信息在规范中DynamicOutput给出。DynamicOutputDefinitionsolid

为了连接这些实体,我们首先通过 获取列表return_some_list。然后调用generate_subtasks,它是一个生成器,并map调用它的每个输出函数print_num

运行整个管道应该为 生成的每个子任务打印大量信息generate_subtasks,如下所示(仅显示部分输出):

2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_SUCCESS - Finished execution of step "print_num[subtask_4]" in 2.1ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_START - Started execution of step "print_num[subtask_5]".
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - LOADED_INPUT - Loaded input "some_num" using input manager "io_manager", from output "result" of step "test"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_INPUT - Got input "some_num" of type "Int". (Type check passed).
2021-03-13 21:27:53 - dagster - INFO - system - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - print_num[subtask_5] - 5
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_SUCCESS - Finished execution of step "print_num[subtask_5]" in 1.98ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - ENGINE_EVENT - Finished steps in process (pid: 33738) in 44ms
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - PIPELINE_SUCCESS - Finished execution of pipeline "some_pipeline".

Run Code Online (Sandbox Code Playgroud)

哦,还有一件很酷的事情:Dagster 执行类型检查,如果你给它输入错误的参数,它会很快失败。因此,如果我们要向print_strmap函数提供 ,它甚至会拒绝运行。