标签: aws-step-functions

编排AWS lambda函数

背景

我有一个 API 网关端点,它代理 Lambda 函数 (Lambda A),供我的 React 应用程序获取客户数据。

此 lambda 函数进行 API 调用来获取客户数据,但响应的格式还有很多不足之处。所以我想重新格式化它。

我没有将这种重新格式化逻辑填充到 Lambda A 中,而是编写了一个单独的 Lambda 函数 (Lambda B)。当我的 API 网关端点被命中时,我需要调用这两个函数,第一个函数的输出是第二个函数的输入。

第一个想法:阶跃函数

Step 函数看起来很自然,但可以在阶段之间传递的数据有效负载的大小有 32kb 的限制。我们的 json blob 客户数据经常超出此范围。

我听说针对这种情况提供的唯一“最佳实践”是将有效负载写入 S3,然后将对象密钥传递到下一阶段。

这很好,但我对必须向 S3 写入和删除如此多的短期对象感到不高兴。每天可能有数十或数十万个此类请求。所以我(暂时)放弃了阶跃函数方法。

目前的方法

我目前正在使用 javascript SDK 直接从 Lambda A 调用 Lambda B。这有相当多的缺点;值得注意的是,我有时会同时运行两个 lambda,但没有任何性能优势。换句话说,我付钱让 Lambda A 坐在那里等待 Lambda B 的响应(我也付钱)。

这感觉像是一种反模式,而且我听说过它具有这样的特征。

问题

这似乎是一个相对常见的场景 - 进行 API 调用(函数 A),然后执行一些附加逻辑来补充、重新格式化或以其他方式修改该响应(函数 B),然后将其传回调用者。

当然,我不是第一个想要使用两个 Lambda 函数来做这样的事情的人。

  • 假设我不能使用步骤函数,我可以选择使用两个 lambda 函数执行此操作吗?

  • 除了使用 S3 之外,还有其他方法可以解决 Step Functions 的 32kb 有效负载大小限制吗?

  • 如果我愚蠢地想避免使用 S3/Step …

javascript amazon-web-services aws-lambda aws-api-gateway aws-step-functions

5
推荐指数
1
解决办法
584
查看次数

AWS Step Function 句柄 Lambda.Unknown

我有一个简单的 AWS 状态机,其中有两个执行 C# lambda 函数的任务状态,以及一个传递状态错误处理程序来处理“States.ALL”:

{
  "Comment": "StateMachine1",
  "StartAt": "step1",
  "States": {
    "step1": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-2:0000:function:step1",
      "Catch": [ {
            "ErrorEquals": ["States.ALL"],
            "Next": "CatchAllFallback"
         } ],
      "Next": "step2"
    },
      "step2": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-2:0000:function:step2",
        "Catch": [ {
            "ErrorEquals": ["States.ALL"],
            "Next": "CatchAllFallback"
         } ],
      "End": true
    },
     "CatchAllFallback": {
         "Type": "Pass",
         "Result": "This is a fallback from any error code",
         "End": true
      }
  }
}
Run Code Online (Sandbox Code Playgroud)

当其中一个步骤失败时,我会得到以下内容作为“CatchAllFallback”的输入:

"Error": "Lambda.Unknown",
"Cause": "The cause could not be determined because Lambda did …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services aws-lambda aws-step-functions

5
推荐指数
1
解决办法
8752
查看次数

AWS Stepfunctions 结果路径 - 附加到 JSON 而不是嵌套

目前,我正在尝试创建一系列 Lambda,这些 Lambda 将从 StepFunctions 输入执行给定的特定负载。我一切正常;然而,这并不如我所愿。

我终于掌握了InputPath、ResultPath和OutputPath之间的区别。我现在遇到的唯一问题是允许 ResultPath “追加”返回的 JSON,而不是将其嵌套在有效负载中。

这是状态机:

{
  "StartAt": "GetDailyEmails",
  "States": {
    "GetDailyEmails": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:DailyEmailExtractor",
      "InputPath": "$.GetDailyEmailsInputs",
      "ResultPath": "$.TransformEmailsToCSVInputs.GetDailyEmailsResults",
      "Next": "TransformEmailsToCSV"
    },
    "TransformEmailsToCSV": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:EmailTransform",
      "InputPath": "$.TransformEmailsToCSVInputs",
      "End": true
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

这是我提供的输入:

{
    "GetDailyEmailsInputs": {
        "secret_name": "email_password",
        "subject_contains": "stuff",
        "json_output_file_name": "test_emails",
        "bucket_name": "emails"
    },
    "TransformEmailsToCSVInputs": {
        "csv_output_file_name": "email_errors"
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我收到的输出:

{
  "GetDailyEmailsInputs": {
    "secret_name": "email_password",
    "subject_contains": "stuff",
    "json_output_file_name": "test_emails",
    "bucket_name": "emails"
  },
  "TransformEmailsToCSVInputs": {
    "csv_output_file_name": "apex_errors",
    "GetDailyEmailsResults": { …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services aws-lambda aws-step-functions

5
推荐指数
1
解决办法
5125
查看次数

如何将 JSON 传递到 AWS StepFunction 中的 ECS 任务?

我正在尝试创建一个AWS StepFunctions工作流程,其中有一个Lambda任务,后跟一个ECS/Fargate任务。

Lambda 将 ID 作为输入,并以 JSON 形式输出一些数据,供 ECS 任务使用,该任务在其容器环境中运行 Python 脚本。我想在 StepFunctions 中执行以下流程:

{ id: 1234 } -> [Lambda] -> { id: 1234, data: {...} }

{ id: 1234, data: {...} } -> [ECS] -> { id: 1234, result: "bar"}
Run Code Online (Sandbox Code Playgroud)

作为参考,以下是 ECS 任务的示例配置: https://docs.aws.amazon.com/step-functions/latest/dg/sample-project-container-task-notification.html

我无法找到任何方法将inputECS 任务的结构化 JSON 传递到运行该任务的容器。

以下是我迄今为止发现的内容:

  • 我可以使用 JSONPath 选择输入的各个字段并将它们设置为环境变量,从而将 JSON 输入的各个字段传递到容器。但是,如果我将整个input对象 ( $) 分配给环境变量,则它会在运行时失败并出现序列化错误 ( [Object] cannot be converted to a string)。
  • 我可以创建一个中间 lambda,它接受输入并将其转换为 …

amazon-web-services amazon-ecs aws-step-functions

5
推荐指数
1
解决办法
7606
查看次数

配置启动模板时,AWS Batch 作业卡在 RUNNABLE 状态

我已使用AWS Batch Jobs配置了 Step Function 。所有配置都运行良好,但我需要自定义启动实例。为此,我使用启动模板服务并根据AWS Batch配置中使用的实例类型构建简单(空)配置。当使用Launch Template构建计算环境时,批处理作业卡在RUNNABLE阶段。当我在没有启动模板的情况下运行AWS Batch Job时,一切正常。午餐实例表单模板也可以正常工作。谁能给我任何错误或遗漏的建议?以下是整个堆栈元素的定义。

启动模板定义 在此输入图像描述

计算环境详细信息概述

Compute environment name senet-cluster-r5ad-2xlarge-v3-4
Compute environment ARN arn:aws:batch:eu-central-1:xxxxxxxxxxx:compute-environment/senet-cluster-r5ad-2xlarge-v3-4
ECS Cluster name arn:aws:ecs:eu-central-1:xxxxxxxxxxxx:cluster/senet-cluster-r5ad-2xlarge-v3-4_Batch_3323aafe-d7a4-3cfe-91e5-c1079ee9d02e
Type MANAGED
Status VALID
State ENABLED
Service role arn:aws:iam::xxxxxxxxxxx:role/service-role/AWSBatchServiceRole
Compute resources
Minimum vCPUs 0
Desired vCPUs 0
Maximum vCPUs 25
Instance types r5ad.2xlarge
Allocation strategy BEST_FIT
Launch template lt-023ebdcd5df6073df
Launch template version $Default
Instance rolearn:aws:iam::xxxxxxxxxxx:instance-profile/ecsInstanceRole
Spot fleet role
EC2 Keypair …
Run Code Online (Sandbox Code Playgroud)

templates launch amazon-web-services aws-step-functions aws-batch

5
推荐指数
1
解决办法
1586
查看次数

AWS Step Functions 中的 Docker 映像

假设我有一个 AWS Step 函数,其状态是批处理作业,与 Docker 映像关联。

  1. 是否有更有效的方法将 Docker 映像与状态关联起来,而不是使用批处理作业 ( arn:aws:states:::batch:submitJob.sync)?创建活动看起来太复杂了。

  2. 除了基于 AWS 的文件存储、对象数据库、在单独实例上共享的 Docker 卷之外,是否有任何现实的方法可以在这种“dockerized”状态之间交换数据?例如,在状态之间传输容器化应用程序的 STDOUT 内容?

  3. 如何将应用程序级别的错误从 Docker 容器转移(报告)到相应的状态以使其失效,将其标记为“失败”?这取决于应用程序的返回值吗?

amazon-web-services docker aws-step-functions aws-batch

5
推荐指数
1
解决办法
1920
查看次数

使用step函数运行多个fargate任务

我想使用 Step 函数运行 Fargate 任务的 5 个实例。

...
    "Fargate_task": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ecs:runTask.sync",
      "Parameters": {
        "LaunchType": "FARGATE",
        "Count":5,
        "Cluster": "my_cluster",
        "TaskDefinition": "my_task_definition",
        "NetworkConfiguration": {
          "AwsvpcConfiguration": {
            "Subnets": [
              "subnet-1",
              "subnet-2"
            ],
            "AssignPublicIp": "ENABLED"
          }
        }
      },
      "Next": "Next_task"
    },
...
Run Code Online (Sandbox Code Playgroud)

这是我提供的 Json,但我无法保存它,因为“Step Functions 不支持字段‘Count’”。

你知道该怎么做吗?

我按照这里的文档,其中写了关键“计数”......

感谢您的帮助

amazon-web-services amazon-ecs aws-step-functions aws-fargate

5
推荐指数
1
解决办法
1896
查看次数

有没有办法将数字转换为 DynamoDB 在 Step Functions 中期望的字符串?

我有一个物联网主题从设备接收数据。每个 IoT 负载都包含一些属性和对象数组,如下所示。

{
  "batchId": "someBatchId",
  "prop1": "someProp1",
  "objArray": [
    {
      "arrString1": "someArrString1",
      "arrString2": "someArrString2",
      "arrNum1": 1,
      "arrNum2": 2,
      "arrString3": "someArrString3"
    },
    {
      "arrString1": "someArrString4",
      "arrString2": "someArrString5",
      "arrNum1": 3,
      "arrNum2": 4,
      "arrString3": "someArrString6"
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

该数组中可以包含数百个对象。我们希望使用一个Map步骤来展平这些数据,并将顶级属性与数组中的每个元素相关联,并将该元素插入到 DynamoDB 中。我们的桌子设置和物联网主题运行得很好。

我们遇到的问题是 DynamoDB 在插入数字时需要字符串。但是,由于我们从 IoT 以 JSON 对象的形式接收这些数据,并且数字位于对象数组内部,因此我们很难将数字转换为字符串。因此,我们希望 Step Function 以某种方式将数字转换为字符串,但我不知道该怎么做。这里的目标是构建一个简单的管道,用于将 IoT 数据存储到 DynamoDB 中。

我们也不能完全控制可以发送的所有属性,因此我们还在 S3 中存储 IoT 有效负载的副本(它已经与 IoT 规则引擎连接并且工作得很好),但这更多的是备份和包罗万象。我们最感兴趣的是进入 DynamoDB 的数据,以便我们可以实际查询它。我们如何说服 Step Function 将 JSON 有效负载中的数字插入 DynamoDB 中?

amazon-dynamodb aws-step-functions

5
推荐指数
1
解决办法
3272
查看次数

CDK Step Functions - 如何创建循环

我正在尝试迁移使用 AWS 界面创建的 Step 函数,但在重现以下行为时遇到了问题:

根据条件,我希望任务 2 执行任务 3 并返回到任务 1,或者结束步骤函数。我的问题是图像上的红色路径 在此输入图像描述 这是我现在的代码:

sfn.Chain.start(OtherTaskWeDoNotCare)
  .next(task1)
  .next(
    new sfn.Choice(this, "task2").when(
      sfn.Condition.booleanEquals("$.isFinished", false),
      task3.next(task1) // This is not working
    )
  );
Run Code Online (Sandbox Code Playgroud)

希望可以有人帮帮我!提前致谢!

amazon-web-services typescript aws-step-functions aws-cdk

5
推荐指数
1
解决办法
6063
查看次数

如何使用 TaskToken 两次回调同一个步骤函数?

我使用带有 lambda 的步骤函数来使用 $$.Task.Token 和 SendTaskSuccess 进行回调。

第一次回调时一切正常。但如果我需要进行第二次回调,我会得到TaskTimedOut: Task Timed Out: 'Provided task does not exist anymore'.

我不确定是否应该保留 $$.Task.Token 中的第一个任务令牌或为每个回调生成一个新的任务令牌。但这似乎并不重要,因为我已经尝试过两者都做,并且无论哪种方式都得到相同的结果。

这不应该是可能的吗?有什么特别的事我必须做吗?

callback amazon-web-services aws-lambda aws-step-functions

5
推荐指数
1
解决办法
4198
查看次数