Argo 工作流程中的动态“扇入”

mpa*_*tek 6 workflow directed-acyclic-graphs kubernetes argo-workflows

Argo 允许根据先前步骤的输出动态生成并行工作流程步骤。

这里提供了动态工作流程生成的示例: https: //github.com/argoproj/argo-workflows/blob/master/examples/loops-param-result.yaml

我正在尝试创建一个类似的工作流程,其中包含最终的“扇入”步骤,该步骤将从动态创建的并行步骤中读取输出。这是一个尝试:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    - - name: write
        template: output-number
        arguments:
          parameters:
          - name: number
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"
    - - name: fan-in
        template: fan-in
        arguments:
          parameters:
          - name: numbers
            value: "{{steps.write.outputs.parameters.number}}"

  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: output-number
    inputs:
      parameters:
      - name: number
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo {{inputs.parameters.number}} > /tmp/number.txt"]
    outputs:
      parameters:
        - name: number
          valueFrom:
            path: /tmp/number.txt

  - name: fan-in
    inputs:
      parameters:
        - name: numbers
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo received {{inputs.parameters.numbers}}"]
Run Code Online (Sandbox Code Playgroud)

我能够提交此工作流程,并且它运行成功。不幸的是,最后一步的输出fan-in如下所示:

fan-in: received {{steps.write.outputs.parameters.number}}
Run Code Online (Sandbox Code Playgroud)

未对输入参数的值numbers进行插值。关于如何让它发挥作用有什么想法吗?

Mic*_*haw 6

聚合步骤输出参数可通过 访问steps.STEP-NAME.outputs.parameters。无法按名称访问一个参数的一组聚合输出。

对工作流程的这一微小改变应该可以满足您的需求:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    - - name: write
        template: output-number
        arguments:
          parameters:
          - name: number
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"
    - - name: fan-in
        template: fan-in
        arguments:
          parameters:
          - name: numbers
            value: "{{steps.write.outputs.parameters}}"

  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: output-number
    inputs:
      parameters:
      - name: number
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo {{inputs.parameters.number}} > /tmp/number.txt"]
    outputs:
      parameters:
        - name: number
          valueFrom:
            path: /tmp/number.txt

  - name: fan-in
    inputs:
      parameters:
        - name: numbers
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo received {{inputs.parameters.numbers}}"]
Run Code Online (Sandbox Code Playgroud)

唯一的变化是.number从 中删除{{steps.write.outputs.parameters.number}}

这是新的输出:

received [{number:20},{number:21},{number:22},{number:23},{number:24},{number:25},{number:26},{number:27},{number:28},{number:29},{number:30}]
Run Code Online (Sandbox Code Playgroud)

这是GitHub 问题,其中讨论/创建了输出参数聚合

我提出了一个增强提案,用于按名称访问聚合输出参数。