我有一个由两个固体组成的 Dagster 管道(下面是可重现的示例)。第一个 ( return_some_list) 输出一些对象的列表。第二个实体 ( print_num) 接受第一个列表(不是完整列表)中的元素,并对该元素进行一些处理。
我该如何为第一个实体返回的列表中的每个元素调用第二个实体?还请解释任何最佳实践。
不确定这是否是最好的方法(让我知道),但我想print_num为第一个实体输出的每个元素生成一个不同的实体实例。这将帮助我将来并行化实体并更好地处理长/计算密集型实体。
from dagster import execute_pipeline, pipeline, solid
@solid
def return_some_list(context):
return [1,2,3,4,5]
@solid
def print_num(context, some_num: int):
print(some_num)
return some_num
@pipeline
def some_pipeline():
output_list = return_some_list()
for some_num in output_list:
print_num(some_num)
if __name__ == "__main__":
result = execute_pipeline(some_pipeline)
Run Code Online (Sandbox Code Playgroud)