在 Step Function 内循环

Aji*_*oel 11 amazon-web-services aws-lambda aws-step-functions

我试图在循环中调用步骤函数中的几个步骤,但我无法弄清楚我需要如何执行此操作。这是我现在所拥有的:我需要添加另一个 lambda 函数(GetReviews),然后它将循环调用 CreateReview、SendNotification。我该怎么做呢?我指的是“使用 Lambda 迭代循环”文档,它表明这是可能的。

阶跃函数定义:

{
  "Comment": "Scheduling Engine",
  "StartAt": "CreateReview",
  "States": {
    "CreateReview": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-west-2:.........:function:CreateReview",
      "Next": "CreateNotification",
      "InputPath": "$",
      "ResultPath": "$.CreateReviewResult",
      "OutputPath": "$"      
    },
    "CreateNotification": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-west-2:.........:function:CreateNotification",
      "InputPath": "$",
      "ResultPath": "$.CreateNotificationResult",
      "OutputPath": "$",
      "End": true
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

fed*_*nev 12

下面提供了三个选项。这是一个直观的总结:

循环选项

#1 Map:对数组的每个元素重复一组步骤(没有循环,可以同时进行)

当您想要为输入数组的每个元素运行一组步骤时,映射状态是循环的替代方法。默认情况下,每个元素并行运行。设置MaxConcurrency: 1为模仿 @NunoGamaFreire 基于循环的解决方案的串行执行。

{
  "StartAt": "MockArray",
  "States": {
    "MockArray": {
      "Type": "Pass",
      "Result": [ { "name": "Zaphod" }, { "name": "Arthur" }, { "name": "Trillian" } ],
      "ResultPath": "$.Items",
      "Next": "MapState"
    },
    "MapState": {
      "Type": "Map",
      "ResultPath": "$.MapResult",
      "End": true,
      "Iterator": {
        "StartAt": "MockWork",
        "States": {
          "MockWork": {
            "Type": "Pass",
            "Parameters": {
              "output.$": "States.Format('Hello, {}!', $.name)"
            },
            "OutputPath": "$.output",
            "End": true
          }
        }
      },
      "ItemsPath": "$.Items"
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

为 中的每个元素生成一个输出MockArray,同时处理:

"MapResult": [ "Hello, Zaphod!", "Hello, Arthur!", "Hello, Trillian!" ]
Run Code Online (Sandbox Code Playgroud)

#2 重复一组步骤 X 次(没有 lambda 的循环,串行)

此选项涉及正确的循环!连续重复一组任务 X 次,直到选择状态确定递增计数器变量已达到 X。使用新的States.MathAdd 内部函数无需 Lambda 即可递增。此选项适用于自定义重试逻辑或您可能希望尽早中断循环的其他情况。

{
  "StartAt": "InitializeCounter",
  "States": {
    "InitializeCounter": {
      "Type": "Pass",
      "Comment": "Initialize the counter at 0.  Move all inputs to Payload.Input",
      "Parameters": {
        "Counter": 0
      },
      "Next": "IncrementCounter"
    },
    "IncrementCounter": {
      "Type": "Pass",
      "Comment": "Increment the Counter by 1",
      "Parameters": {
        "Counter.$": "States.MathAdd($.Counter, 1)"
      },
      "Next": "MockWork"
    },
    "MockWork": {
      "Type": "Pass",
      "Comment": "Simulate some work.  Optionally break early from the loop with ExitNow: true",
      "Result": false,
      "ResultPath": "$.ExitNow",
      "Next": "Loop?"
    },
    "Loop?": {
      "Type": "Choice",
      "Choices": [{ "Or": [
         { "Variable": "$.Counter", "NumericGreaterThanEqualsPath": "$$.Execution.Input.workCount" },
         { "Variable": "$.ExitNow", "BooleanEquals": true } ],
          "Next": "Success"
        }
      ],
      "Default": "IncrementCounter"
    },
    "Success": {
      "Type": "Succeed"
    }
  },
  "TimeoutSeconds": 3
}
Run Code Online (Sandbox Code Playgroud)

Counter每个循环迭代一次。如果任务返回,则循环中断ExitNow: true

{ "Counter": 4, "ExitNow": false }
Run Code Online (Sandbox Code Playgroud)

#3 重复一组步骤 X 次(无循环,*同时*)

此选项是前两个选项的混合。与 #2 一样,我们从输入中的所需迭代次数开始$.workCount。与 #1 一样,我们同时映射一个数组。不过,这一次,状态机使用另一个内在函数来创建数组。States.ArrayRange(1, $.workCount, 1)

{
  "StartAt": "Iterations",
  "States": {
    "Iterations": {
      "Type": "Pass",
      "Parameters": {
        "Iterations.$": "States.ArrayRange(1, $.workCount, 1)"
      },
      "Next": "MapState"
    },
    "MapState": {
      "Type": "Map",
      "ResultPath": "$.MapResult",
      "End": true,
      "Iterator": {
        "StartAt": "MockWork",
        "States": {
          "MockWork": {
            "Type": "Pass",
            "Parameters": {
              "output.$": "States.Format('Hello from iteration #{}', States.JsonToString($))"
            },
            "OutputPath": "$.output",
            "End": true
          }
        }
      },
      "ItemsPath": "$.Iterations"
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

任务同时运行,对 中的每个项目运行一次Iterations

"Iterations": [ 1, 2, 3, 4 ],
"MapResult": [  "Hello from iteration #1",  "Hello from iteration #2",  "Hello from iteration #3",  "Hello from iteration #4" ]
Run Code Online (Sandbox Code Playgroud)


小智 8

我对这个答案做出了贡献,因为我使用了一种稍微不同的方法来能够在步骤函数内部循环,而不必依赖 lambda 来递增。如果将来有人需要通用解决方案,这可以是一个很好的参考。这是带有代码的示例:

在此输入图像描述

{
"Comment": "A description of my state machine",
"StartAt": "InitVariables",
"States": {
    "InitVariables": {
        "Type": "Pass",
        "Parameters": {
            "index": 0,
            "incrementor": 1,
            "ArrayLength.$": "States.ArrayLength($.inputArray)"
        },
        "ResultPath": "$.iterator",
        "Next": "LoopChoice"
    },
    "LoopChoice": {
        "Type": "Choice",
        "Choices": [
            {
                "Variable": "$.iterator.ArrayLength",
                "NumericGreaterThanPath": "$.iterator.index",
                "Next": "IncrementVariable"
            }
        ],
        "Default": "End"
    },
    "IncrementVariable": {
        "Type": "Pass",
        "Parameters": {
            "index.$": "States.MathAdd($.iterator.index, $.iterator.incrementor)",
            "incrementor": 1,
            "ArrayLength.$": "$.iterator.ArrayLength"
        },
        "ResultPath": "$.iterator",
        "Next": "LoopChoice"
    },
    "End": {
        "Type": "Pass",
        "End": true
    }
} }
Run Code Online (Sandbox Code Playgroud)

这是循环的基础,我使用States.MathAdd($.iterator.index, $.iterator.incrementor)内部函数添加两个值,在本例中,使用 initVariables 状态中定义的增量来增加索引。并获取我想要循环的数组的长度。您还可以使用内部函数 来获取数组长度States.ArrayLength("$.path.to.array")。该数组在输入中传递。要获取数组的值,我们可以使用内部函数States.ArrayGetItem($.inputArray, $.iterator.index)。所有自定义逻辑都应该放在loopChoice状态和IncrementVariable状态之间。

希望这对将来的人有帮助。


ElF*_*itz 4

这么晚才回复很抱歉。你可能已经在中间解决了这个问题,但你在这里

因此,在 Step Functions 中循环时,我只需添加一个选择状态(请参阅选择状态规则)。

您的状态之一需要输出您是否已完成循环,或者迭代的项目数和项目总数。

在第一种情况下,它会是这样的

{
  "Comment": "Scheduling Engine",
  "StartAt": "CreateReview",
  "States": {
    "GetReviews": {
       whatever
       "Next": "LoopChoiceState"
    },
    "LoopChoiceState": {
      "Type" : "Choice",
      "Choices": [
       {
          "Variable": "$.loopCompleted",
          "BooleanEquals": false,
          "Next": "GetReviews"
        }
      ],
      "Default": "YourEndState"
    },
    "CreateReview": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-westz2:529627678433:function:CreateReview",
      "Next": "CreateNotification",
      "InputPath": "$",
      "ResultPath": "$.CreateReviewResult",
      "OutputPath": "$"      
    },
    "CreateNotification": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-west-2:529627678433:function:CreateNotification",
      "InputPath": "$",
      "ResultPath": "$.CreateNotificationResult",
      "OutputPath": "$",
      "End": true
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

第二种情况:

{
  "Comment": "Scheduling Engine",
  "StartAt": "CreateReview",
  "States": {
    "GetReviews": {
       whatever
       "Next": "LoopChoiceState"
    },
    "LoopChoiceState": {
       "Type" : "Choice",
       "Choices": [
         {
           "Variable": "$.iteratedItemsCount",
           "NumericEquals": "$.totalItemsCount",
           "Next": "CreateNotification"
         }
     ],
     "Default": "CreateReview"
    },
    "CreateReview": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-west-2:529627678433:function:CreateReview",
      "Next": "CreateNotification",
      "InputPath": "$",
      "ResultPath": "$.CreateReviewResult",
      "OutputPath": "$"      
    },
    "CreateNotification": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-west-2:529627678433:function:CreateNotification",
      "InputPath": "$",
      "ResultPath": "$.CreateNotificationResult",
      "OutputPath": "$",
      "End": true
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

您还可以使用索引(当前索引和最后一个索引)而不是迭代的项目数;它将帮助您跟踪您在哪里创建评论。