use*_*803 5 python kubeflow kubeflow-pipelines
我在动态列表上使用 ParallelFor。我想收集循环中的所有输出,并将它们传递给另一个 ContainerOp。
类似于以下内容,这显然不起作用,因为outputs列表将是静态的。
with dsl.ParallelFor(op1.output) as item:
op2 = dsl.ContainerOp(
name='op2',
...
file_outputs={
'outputs': '/outputs.json',
})
outputs.append(op2.output)
op3 = dsl.ContainerOp(
name='op3',
...
arguments=['--input': outputs] # won't work
)
Run Code Online (Sandbox Code Playgroud)
不幸的是,Ark-kun 的解决方案对我不起作用。但是,如果我们提前知道输入的数量,则有一种简单的方法可以实现扇入工作流程。我们可以像这样预先计算管道 DAG:
@kfp.components.create_component_from_func
def my_transformer_op(item: str) -> str:
return item + "_NEW"
@kfp.components.create_component_from_func
def my_aggregator_op(items: list) -> str:
return "HELLO"
def pipeline(array_of_arguments):
@dsl.pipeline(PIPELINE_NAME, PIPELINE_DESCRIPTION)
def dynamic_pipeline():
outputs = []
for i in array_of_arguments:
outputs.append(my_transformer_op(str(i)).output)
my_aggregator_op(outputs)
return dynamic_pipeline
...
run_id = client.create_run_from_pipeline_func(
pipeline(data_samples_chunks), {},
run_name=PIPELINE_RUN,
experiment_name=PIPELINE_EXPERIMENT).run_id
Run Code Online (Sandbox Code Playgroud)
问题在于op3没有正确引用输出op2作为输入参数。尝试这个:
op3 = dsl.ContainerOp(
...
arguments=['--input': op2.outputs['outputs']]
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1779 次 |
| 最近记录: |