小编cya*_*yau的帖子

Dagster 循环实体的输出和并发处理

我有一个由两个固体组成的 Dagster 管道(下面是可重现的示例)。第一个 ( return_some_list) 输出一些对象的列表。第二个实体 ( print_num) 接受第一个列表(不是完整列表)中的元素,并对该元素进行一些处理。

我该如何为第一个实体返回的列表中的每个元素调用第二个实体?还请解释任何最佳实践。

不确定这是否是最好的方法(让我知道),但我想print_num为第一个实体输出的每个元素生成一个不同的实体实例。这将帮助我将来并行化实体并更好地处理长/计算密集型实体。

from dagster import execute_pipeline, pipeline, solid

@solid
def return_some_list(context):
    return [1,2,3,4,5]

@solid
def print_num(context, some_num: int):
    print(some_num)
    return some_num


@pipeline
def some_pipeline():
    output_list = return_some_list()
    for some_num in output_list:
        print_num(some_num)

if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)
Run Code Online (Sandbox Code Playgroud)

python concurrency pipeline dagster

5
推荐指数
1
解决办法
2924
查看次数

标签 统计

concurrency ×1

dagster ×1

pipeline ×1

python ×1