标签: aws-step-functions

编排AWS lambda函数

背景

我有一个 API 网关端点,它代理 Lambda 函数 (Lambda A),供我的 React 应用程序获取客户数据。

此 lambda 函数进行 API 调用来获取客户数据,但响应的格式还有很多不足之处。所以我想重新格式化它。

我没有将这种重新格式化逻辑填充到 Lambda A 中,而是编写了一个单独的 Lambda 函数 (Lambda B)。当我的 API 网关端点被命中时,我需要调用这两个函数,第一个函数的输出是第二个函数的输入。

第一个想法:阶跃函数

Step 函数看起来很自然,但可以在阶段之间传递的数据有效负载的大小有 32kb 的限制。我们的 json blob 客户数据经常超出此范围。

我听说针对这种情况提供的唯一“最佳实践”是将有效负载写入 S3,然后将对象密钥传递到下一阶段。

这很好,但我对必须向 S3 写入和删除如此多的短期对象感到不高兴。每天可能有数十或数十万个此类请求。所以我(暂时)放弃了阶跃函数方法。

目前的方法

我目前正在使用 javascript SDK 直接从 Lambda A 调用 Lambda B。这有相当多的缺点;值得注意的是,我有时会同时运行两个 lambda,但没有任何性能优势。换句话说,我付钱让 Lambda A 坐在那里等待 Lambda B 的响应(我也付钱)。

这感觉像是一种反模式,而且我听说过它具有这样的特征。

问题

这似乎是一个相对常见的场景 - 进行 API 调用(函数 A),然后执行一些附加逻辑来补充、重新格式化或以其他方式修改该响应(函数 B),然后将其传回调用者。

当然,我不是第一个想要使用两个 Lambda 函数来做这样的事情的人。

  • 假设我不能使用步骤函数,我可以选择使用两个 lambda 函数执行此操作吗?

  • 除了使用 S3 之外,还有其他方法可以解决 Step Functions 的 32kb 有效负载大小限制吗?

  • 如果我愚蠢地想避免使用 S3/Step …

javascript amazon-web-services aws-lambda aws-api-gateway aws-step-functions

5
推荐指数
1
解决办法
584
查看次数

AWS Step Function 句柄 Lambda.Unknown

我有一个简单的 AWS 状态机,其中有两个执行 C# lambda 函数的任务状态,以及一个传递状态错误处理程序来处理“States.ALL”:

{
  "Comment": "StateMachine1",
  "StartAt": "step1",
  "States": {
    "step1": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-2:0000:function:step1",
      "Catch": [ {
            "ErrorEquals": ["States.ALL"],
            "Next": "CatchAllFallback"
         } ],
      "Next": "step2"
    },
      "step2": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-2:0000:function:step2",
        "Catch": [ {
            "ErrorEquals": ["States.ALL"],
            "Next": "CatchAllFallback"
         } ],
      "End": true
    },
     "CatchAllFallback": {
         "Type": "Pass",
         "Result": "This is a fallback from any error code",
         "End": true
      }
  }
}
Run Code Online (Sandbox Code Playgroud)

当其中一个步骤失败时,我会得到以下内容作为“CatchAllFallback”的输入:

"Error": "Lambda.Unknown",
"Cause": "The cause could not be determined because Lambda did …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services aws-lambda aws-step-functions

5
推荐指数
1
解决办法
8752
查看次数

AWS Stepfunctions 结果路径 - 附加到 JSON 而不是嵌套

目前,我正在尝试创建一系列 Lambda,这些 Lambda 将从 StepFunctions 输入执行给定的特定负载。我一切正常;然而,这并不如我所愿。

我终于掌握了InputPath、ResultPath和OutputPath之间的区别。我现在遇到的唯一问题是允许 ResultPath “追加”返回的 JSON,而不是将其嵌套在有效负载中。

这是状态机:

{
  "StartAt": "GetDailyEmails",
  "States": {
    "GetDailyEmails": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:DailyEmailExtractor",
      "InputPath": "$.GetDailyEmailsInputs",
      "ResultPath": "$.TransformEmailsToCSVInputs.GetDailyEmailsResults",
      "Next": "TransformEmailsToCSV"
    },
    "TransformEmailsToCSV": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:EmailTransform",
      "InputPath": "$.TransformEmailsToCSVInputs",
      "End": true
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

这是我提供的输入:

{
    "GetDailyEmailsInputs": {
        "secret_name": "email_password",
        "subject_contains": "stuff",
        "json_output_file_name": "test_emails",
        "bucket_name": "emails"
    },
    "TransformEmailsToCSVInputs": {
        "csv_output_file_name": "email_errors"
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我收到的输出:

{
  "GetDailyEmailsInputs": {
    "secret_name": "email_password",
    "subject_contains": "stuff",
    "json_output_file_name": "test_emails",
    "bucket_name": "emails"
  },
  "TransformEmailsToCSVInputs": {
    "csv_output_file_name": "apex_errors",
    "GetDailyEmailsResults": { …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services aws-lambda aws-step-functions

5
推荐指数
1
解决办法
5125
查看次数

如何将 JSON 传递到 AWS StepFunction 中的 ECS 任务?

我正在尝试创建一个AWS StepFunctions工作流程,其中有一个Lambda任务,后跟一个ECS/Fargate任务。

Lambda 将 ID 作为输入,并以 JSON 形式输出一些数据,供 ECS 任务使用,该任务在其容器环境中运行 Python 脚本。我想在 StepFunctions 中执行以下流程:

{ id: 1234 } -> [Lambda] -> { id: 1234, data: {...} }

{ id: 1234, data: {...} } -> [ECS] -> { id: 1234, result: "bar"}
Run Code Online (Sandbox Code Playgroud)

作为参考,以下是 ECS 任务的示例配置: https://docs.aws.amazon.com/step-functions/latest/dg/sample-project-container-task-notification.html

我无法找到任何方法将inputECS 任务的结构化 JSON 传递到运行该任务的容器。

以下是我迄今为止发现的内容:

  • 我可以使用 JSONPath 选择输入的各个字段并将它们设置为环境变量,从而将 JSON 输入的各个字段传递到容器。但是,如果我将整个input对象 ( $) 分配给环境变量,则它会在运行时失败并出现序列化错误 ( [Object] cannot be converted to a string)。
  • 我可以创建一个中间 lambda,它接受输入并将其转换为 …

amazon-web-services amazon-ecs aws-step-functions

5
推荐指数
1
解决办法
7606
查看次数

重试并捕获 AWS Step Function 任务

对于 StepFunctions,我们是否可以两者兼而有之Retry,并Catch在穷尽的情况下共同努力?

这是我的用例

  1. 作业失败
  2. 重试
  3. 重试耗尽,转至 Catch
  4. 捕获所有错误,移至下一个作业,并更新数据库表以标记此作业失败(另一项任务)
  5. 或者在第一次运行或重试成功后,转移到下一个作业
    "ExecuteMyJob": {
            "Type": "Task",
            "Resource": "arn:aws:states:::glue:startJobRun.sync",
            "Parameters": {
                "JobName.$": "$.jobName",
                "Arguments.$": "$.jobArguments"
            },
            "Retry" : [{
                "ErrorEquals": [ "States.TaskFailed", "States.Runtime" ],
                "MaxAttempts": 3,
                "IntervalSeconds": 60,
                "BackoffRate": 2
            }],
            "Catch": [{
                "ErrorEquals": [ "States.ALL" ],
                "Next": "MarkJobFailOnDbTable"
            }],
            "Next": "NextJobOnPreviousSuccess"
        }
Run Code Online (Sandbox Code Playgroud)

amazon-web-services aws-step-functions

5
推荐指数
1
解决办法
4409
查看次数

如何使用Terraform定义cloundwatch事件规则来触发StepFunction状态机

我已经在 Terraform 中定义了 StepFunction 状态机的创建,现在我想设置一个计时器来每天触发状态机,我认为使用 cloudwatch 事件规则可能是一个不错的选择,我知道如何设置事件规则来触发 Lambda :

resource "aws_cloudwatch_event_rule" "lambda_event_rule" {
  name                = xxx
  schedule_expression = xxx
  description         = xxx
}

resource "aws_cloudwatch_event_target" "lambda_event_target" {
  target_id = xxx
  rule      = aws_cloudwatch_event_rule.lambda_event_rule.name
  arn       = xxx
}

#I must setup the right permissions using 'aws_lambda_permission' 
#see: https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_event_target

resource "aws_lambda_permission" "lambda_event_permission" {
  statement_id  = xxx
  action        = "lambda:InvokeFunction"
  function_name = xxx
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.lambda_event_rule.name
}
Run Code Online (Sandbox Code Playgroud)

但如何设置触发状态机的权限部分?我找不到任何关于它的例子,我错过了什么吗?是因为我们不需要状态机的权限配置吗?有人可以帮忙吗?

以下是到目前为止我使用 cloudwatch 事件规则触发状态机的内容:

resource "aws_cloudwatch_event_rule" "step_function_event_rule" {
  name                = xxx
  schedule_expression = …
Run Code Online (Sandbox Code Playgroud)

state-machine amazon-web-services terraform aws-step-functions amazon-cloudwatch-events

5
推荐指数
1
解决办法
8464
查看次数

如何在AWS Step Functions中实现手动审批?SWF 和 Step Functions 之间推荐的工作流引擎是什么?

Step Functions 现在支持回调功能以支持手动批准。我想知道任务令牌是如何生成的,以及我们是否可以传递自己的任务令牌字符串,以便我们不需要存储它来标记任务通过/失败。

此外,对于需要多个客户端交互才能进入下一个状态的工作流程,建议使用 Step Functions 或 SWF(及其信号)。

用例:我在工作流程中有多个步骤,如果计时器达到 6 个月,或者如果在这 6 个月内用户实际批准,则我们需要执行失败场景,那么工作流程需要执行到通过的场景。

workflow amazon-web-services amazon-swf aws-step-functions

5
推荐指数
1
解决办法
4210
查看次数

如何等待 AWS ECS 任务处于 RUNNING 状态,然后再转到 AWS Step Function 中的下一步?

我有一个对象数组,我想将其迭代传递给 Lambda 函数。但是,我还需要为我启动的每个 Lambda 函数运行一个 ECS 任务。

我发现我需要一个 AWS Step Function,用于迭代 JSON 输入数组。对于每个输入,我必须启动一个 ECS 任务,等待它处于 RUNNING 状态,然后转到下一步,调用 Lambda 函数。就我而言,ECS 任务本身不返回任何内容。它应该保持运行,因为 Lambda 函数使用它。

目前,我拥有它,以便 ECS 任务启动,但它停留在启动 ECS 任务步骤,因为它不返回任何内容。在进入下一步之前,我如何才能等待它处于 RUNNING 状态?

当前阶跃函数定义:

{
  "StartAt": "Iterate",
  "States": {
    "Iterate": {
      "Type": "Map",
      "Iterator": {
        "StartAt": "Start ECS Task",
        "States": {
          "Start ECS Task": {
            "Type": "Task",
            "Resource": "arn:aws:states:::ecs:runTask",
            "Parameters": {
              "LaunchType": "FARGATE",
              "Cluster": "<cluster-arn>",
              "TaskDefinition": "<task-definition-arn>",
              "NetworkConfiguration": {
                "AwsvpcConfiguration": {
                  "Subnets": [
                    "<subnet-id>"
                  ],
                  "AssignPublicIp": "ENABLED"
                }
              }
            },
            "Next": "Invoke Lambda function" …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services amazon-ecs aws-lambda aws-step-functions

5
推荐指数
1
解决办法
5509
查看次数

具有动态环境变量的 AWS CDK StateMachine BatchSubmitJob

我正在尝试在 AWS CDK 中创建一个带有 BatchSubmitJob 的状态机,并在 BatchContainerOverrides 中使用动态环境变量。我在想这样的事情:

container_overrides = sfn_tasks.BatchContainerOverrides(
    environment={
        "TEST.$": "$.dynamic_from_payload"
    }
)

return sfn_tasks.BatchSubmitJob(self.scope,
                                id="id",
                                job_name="name",
                                job_definition_arn="arn",
                                job_queue_arn="arn",
                                container_overrides=container_overrides,
                                payload=sfn.TaskInput.from_object({
                                    "dynamic_from_payload.$": "$.input.some_variable"
                                }))
Run Code Online (Sandbox Code Playgroud)

但是,在部署时,CDK 会将“名称”和“值”添加到状态机定义中,但值现在是静态的。这是状态机定义的一部分,如控制台中所示:

"Environment": [
    {
        "Name": "TEST.$",
        "Value": "$.dynamic_from_payload"
    }
]
Run Code Online (Sandbox Code Playgroud)

但我需要这样:

"Environment": [
    {
        "Name": "TEST",
        "Value.$": "$.dynamic_from_payload"
    }
]
Run Code Online (Sandbox Code Playgroud)

我还尝试使用“Ref::”,如此处对命令参数所做的那样:AWS Step 和 Batch Dynamic Command。但这也行不通。

我还研究了逃生舱口,覆盖了 CloudFormation 模板。但我认为这不适用于此处,因为生成的状态机定义字符串基本上是一个大字符串。

我可以想到两种解决方案,但这两种解决方案都不让我高兴:用转义舱口覆盖状态机定义字符串,并使用其中“值”在某些条件下被替换的副本(可能使用正则表达式)或将 lambda 放入状态机中这将创建并触发批处理作业以及一个将轮询作业是否完成的 lambda。

长话短说:有人知道如何在 CDK 中将动态环境变量与 BatchSubmitJob 一起使用吗?

aws-step-functions aws-batch aws-cdk

5
推荐指数
1
解决办法
918
查看次数

AWS HTTP API 与 AWS Step Functions 集成 -&gt; 在输入中发送多个值

我有一个Type: AWS::Serverless::HttpApi正在尝试连接到 aType: AWS::Serverless::StateMachine作为触发器。这意味着 HTTP API 将触发 Step Function 状态机。

我只需指定一个输入就可以让它工作。例如,DefinitionBody当它工作时,看起来像这样:

      DefinitionBody:
        info:
          version: '1.0'
          title:
            Ref: AWS::StackName
        paths:
          "/github/secret":
            post:
              responses: 
                default:
                  description: "Default response for POST /"
              x-amazon-apigateway-integration:
                integrationSubtype: "StepFunctions-StartExecution"
                credentials:
                  Fn::GetAtt: [StepFunctionsApiRole, Arn]
                requestParameters:
                  Input: $request.body
                  StateMachineArn: !Ref SecretScannerStateMachine
                payloadFormatVersion: "1.0"
                type: "aws_proxy"
                connectionType: "INTERNET"
                timeoutInMillis: 30000
        openapi: 3.0.1
        x-amazon-apigateway-importexport-version: "1.0"
Run Code Online (Sandbox Code Playgroud)

请注意以下行:Input: $request.body。我只是指定$request.body.

但是,我需要能够发送$request.body$request.header.X-Hub-Signature-256。我需要将这两个值作为输入发送到我的状态机。

我尝试过很多不同的方法。例如:

Input: " { body: $request.body, header: $request.header.X-Hub-Signature-256 }" …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services aws-lambda aws-api-gateway aws-step-functions aws-http-api

5
推荐指数
1
解决办法
720
查看次数