kubeflow 管道动态输出列表作为输入参数

use*_*803 5 python kubeflow kubeflow-pipelines

我在动态列表上使用 ParallelFor。我想收集循环中的所有输出,并将它们传递给另一个 ContainerOp。
类似于以下内容,这显然不起作用,因为outputs列表将是静态的。

with dsl.ParallelFor(op1.output) as item:
    op2 = dsl.ContainerOp(
      name='op2',
      ...
      file_outputs={
         'outputs': '/outputs.json',
    })
    outputs.append(op2.output)


op3 = dsl.ContainerOp(
   name='op3',
   ...
   arguments=['--input': outputs]  # won't work
)
Run Code Online (Sandbox Code Playgroud)

Ezh*_*hik 5

不幸的是,Ark-kun 的解决方案对我不起作用。但是,如果我们提前知道输入的数量,则有一种简单的方法可以实现扇入工作流程。我们可以像这样预先计算管道 DAG:

@kfp.components.create_component_from_func
def my_transformer_op(item: str) -> str:
    return item + "_NEW"


@kfp.components.create_component_from_func
def my_aggregator_op(items: list) -> str:
    return "HELLO"


def pipeline(array_of_arguments):
    @dsl.pipeline(PIPELINE_NAME, PIPELINE_DESCRIPTION)
    def dynamic_pipeline():
        outputs = []
        for i in array_of_arguments:
            outputs.append(my_transformer_op(str(i)).output)
        my_aggregator_op(outputs)
    return dynamic_pipeline

...

    run_id = client.create_run_from_pipeline_func(
        pipeline(data_samples_chunks), {},
        run_name=PIPELINE_RUN,
        experiment_name=PIPELINE_EXPERIMENT).run_id
Run Code Online (Sandbox Code Playgroud)

管道图


Eka*_*ong 0

问题在于op3没有正确引用输出op2作为输入参数。尝试这个:

op3 = dsl.ContainerOp(
    ...
    arguments=['--input': op2.outputs['outputs']]
)
Run Code Online (Sandbox Code Playgroud)