我有一个物联网主题从设备接收数据。每个 IoT 负载都包含一些属性和对象数组,如下所示。
{
"batchId": "someBatchId",
"prop1": "someProp1",
"objArray": [
{
"arrString1": "someArrString1",
"arrString2": "someArrString2",
"arrNum1": 1,
"arrNum2": 2,
"arrString3": "someArrString3"
},
{
"arrString1": "someArrString4",
"arrString2": "someArrString5",
"arrNum1": 3,
"arrNum2": 4,
"arrString3": "someArrString6"
}
]
}
Run Code Online (Sandbox Code Playgroud)
该数组中可以包含数百个对象。我们希望使用一个Map步骤来展平这些数据,并将顶级属性与数组中的每个元素相关联,并将该元素插入到 DynamoDB 中。我们的桌子设置和物联网主题运行得很好。
我们遇到的问题是 DynamoDB 在插入数字时需要字符串。但是,由于我们从 IoT 以 JSON 对象的形式接收这些数据,并且数字位于对象数组内部,因此我们很难将数字转换为字符串。因此,我们希望 Step Function 以某种方式将数字转换为字符串,但我不知道该怎么做。这里的目标是构建一个简单的管道,用于将 IoT 数据存储到 DynamoDB 中。
我们也不能完全控制可以发送的所有属性,因此我们还在 S3 中存储 IoT 有效负载的副本(它已经与 IoT 规则引擎连接并且工作得很好),但这更多的是备份和包罗万象。我们最感兴趣的是进入 DynamoDB 的数据,以便我们可以实际查询它。我们如何说服 Step Function 将 JSON 有效负载中的数字插入 DynamoDB 中?
对于 StepFunctions,我们是否可以两者兼而有之Retry,并Catch在穷尽的情况下共同努力?
这是我的用例
"ExecuteMyJob": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName.$": "$.jobName",
"Arguments.$": "$.jobArguments"
},
"Retry" : [{
"ErrorEquals": [ "States.TaskFailed", "States.Runtime" ],
"MaxAttempts": 3,
"IntervalSeconds": 60,
"BackoffRate": 2
}],
"Catch": [{
"ErrorEquals": [ "States.ALL" ],
"Next": "MarkJobFailOnDbTable"
}],
"Next": "NextJobOnPreviousSuccess"
}
Run Code Online (Sandbox Code Playgroud) 我已经在 Terraform 中定义了 StepFunction 状态机的创建,现在我想设置一个计时器来每天触发状态机,我认为使用 cloudwatch 事件规则可能是一个不错的选择,我知道如何设置事件规则来触发 Lambda :
resource "aws_cloudwatch_event_rule" "lambda_event_rule" {
name = xxx
schedule_expression = xxx
description = xxx
}
resource "aws_cloudwatch_event_target" "lambda_event_target" {
target_id = xxx
rule = aws_cloudwatch_event_rule.lambda_event_rule.name
arn = xxx
}
#I must setup the right permissions using 'aws_lambda_permission'
#see: https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_event_target
resource "aws_lambda_permission" "lambda_event_permission" {
statement_id = xxx
action = "lambda:InvokeFunction"
function_name = xxx
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.lambda_event_rule.name
}
Run Code Online (Sandbox Code Playgroud)
但如何设置触发状态机的权限部分?我找不到任何关于它的例子,我错过了什么吗?是因为我们不需要状态机的权限配置吗?有人可以帮忙吗?
以下是到目前为止我使用 cloudwatch 事件规则触发状态机的内容:
resource "aws_cloudwatch_event_rule" "step_function_event_rule" {
name = xxx
schedule_expression = …Run Code Online (Sandbox Code Playgroud) state-machine amazon-web-services terraform aws-step-functions amazon-cloudwatch-events
Step Functions 现在支持回调功能以支持手动批准。我想知道任务令牌是如何生成的,以及我们是否可以传递自己的任务令牌字符串,以便我们不需要存储它来标记任务通过/失败。
此外,对于需要多个客户端交互才能进入下一个状态的工作流程,建议使用 Step Functions 或 SWF(及其信号)。
用例:我在工作流程中有多个步骤,如果计时器达到 6 个月,或者如果在这 6 个月内用户实际批准,则我们需要执行失败场景,那么工作流程需要执行到通过的场景。
在我的步骤函数中,我想执行 Athena 查询。我能够定义一个步骤并成功执行查询。但是,我想传递一些参数作为输入并在查询字符串中使用它们。例如。
假设我的查询字符串是:
select * from <Data Source>.<database>.<tablename> where partition_0 = '2021';
我希望能够将年份作为输入 json 传递给步骤函数,例如:
{
"YYYY": 2021
}
Run Code Online (Sandbox Code Playgroud)
是否可以在查询字符串中插入输入“YYYY”?如果是这样,怎么办?
步骤函数配置示例:
{
"Comment": "Start athena exececution",
"StartAt": "athena",
"States": {
"athena": {
"Type": "Task",
"InputPath": "$",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Parameters": {
"QueryString": "select * from mycatalog.mydatabase.mytable where partition_0 = '2021'",
"WorkGroup": "primary",
"ResultConfiguration": {
"OutputLocation": "s3://mys3bucket"
}
},
"Next": "Pass"
},
"Pass": {
"Comment": "A Pass state passes its input to its output, without performing work. Pass states are useful …Run Code Online (Sandbox Code Playgroud) 在 AWS Step Functions 中,我们可以使用以下语法将之前步骤中的参数用作输入。
"Parameters": {
"Details": {
"weight.$": "$.product.weight",
"unit": "grams"
}
}
Run Code Online (Sandbox Code Playgroud)
举个例子,说product.weight.是500。我可以配置一个将接收值的参数500 grams吗?
例如,这样的事情:
"Parameters": {
"Details": {
"formatted_weight.$": "{$.product.weight} grams",
}
}
Run Code Online (Sandbox Code Playgroud) 我目前正在创建一个 Step Functions 工作流程,其中包含一些 Lambda 和其他在一个任务与其他任务之间共享状态的服务。
我主要是在 Lambda 上执行一些外部请求,并在任务之间共享来处理这些数据。
当我尝试测试 Step Functions 工作流程时,收到以下错误:States.DataLimitExceeded,但当我在 Lambda 控制台上测试时,一切正常。
错误:
谁能帮助我了解我在 Step Functions 工作流程中做错了什么?
如何等待 CodeBuild 在 Step Functions 状态机内完成?
我现在能想到的唯一等待是使用循环和计时器,就像下面的流程一样,但这是唯一的方法吗?没有“WaitForBuild”操作或类似的操作吗?
拼写错误:BatchGetBuildBatches 应该是 BatchGetBuilds。
我有一个由 aws 步骤函数中的状态调用的外部服务,现在我想从这个外部服务发送“sendTaskSuccess”到特定的 aws 步骤机器。我怎样才能做到这一点。我知道我们可以从 lambda 调用“sendSuccessToken”,但想知道如何在没有 lambda 的情况下使用“sendSuccessToken”。
假设我有一个状态,其输入是
{'myList': ['foo', 'bar']}
Run Code Online (Sandbox Code Playgroud)
其结果是hello,我想将结果附加到列表中,以便状态的输出变为
{'myList': ['foo', 'bar', 'hello']}
Run Code Online (Sandbox Code Playgroud)
我实际上想对几个连续的状态执行此操作,以便每个状态将其结果附加到存储在输入对象中的列表中。