我正在尝试使用 AWS 的 Step Functions,我对它们实施长期运行的程序很感兴趣。我想为我的用户提供的一项功能是显示执行进度的可能性。使用describeExecution我可以验证某些执行是否仍在运行或已完成。但进度是一个合乎逻辑的衡量标准,Step Functions 本身无法告诉我这个过程还剩下多少。
为此,我需要自己提供逻辑。我可以测量状态机任务的进度,知道需要采取的总步数并计算已经采取的步数。我可以将此信息存储在机器运行时在步骤之间传递的机器状态中。但是如何使用 API 提取此状态?当然,我可以将这些信息存储在像 DynamoDb 这样的外部存储中,但这不是很优雅!
我正在尝试使用 CloudFormation 部署步进函数,并且我想从 S3 中的外部文件中引用实际的步进函数定义。
模板如下所示:
StepFunction1:
Type: "AWS::StepFunctions::StateMachine"
Properties:
StateMachineName: !Ref StepFunction1SampleName
RoleArn: !GetAtt StepFunctionExecutionRole.Arn
DefinitionString:
Fn::Transform:
Name: AWS::Include
Parameters:
Location:
Fn::Sub: 's3://${ArtifactsBucketName}/StepFunctions/StepFunction1/definition.json'
Run Code Online (Sandbox Code Playgroud)
但是,这似乎不受支持,因为我们收到错误
Property validation failure: [Value of property {/DefinitionString} does not match type {String}]
Run Code Online (Sandbox Code Playgroud)
我正在为 API 做类似的事情,从外部 swagger 文件中引用实际的 API 定义,这似乎工作正常。
例子:
SearchAPI:
Type: "AWS::Serverless::Api"
Properties:
Name: myAPI
StageName: latest
DefinitionBody:
Fn::Transform:
Name: AWS::Include
Parameters:
Location:
Fn::Sub: 's3://${ArtifactsBucketName}/ApiGateway/myAPI/swagger.yaml'
Run Code Online (Sandbox Code Playgroud)
我怎样才能使这项工作?
我有一个现有的 AWS Steps 编排,它通过 lambdas 执行 AWS Batch 作业。但是,AWS 最近添加了从一个步骤直接调用其他服务(如 AWS Batch)的功能。我很想使用这个新功能,但无法让它工作。
https://docs.aws.amazon.com/step-functions/latest/dg/connectors-batch.html
所以我想用来调用 Batch 的新步骤操作。
"File Copy": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobName": "MyBatchJob",
"JobQueue": "MySecondaryQueue",
"ContainerOverrides.$": "$.lts_job_container_overrides",
"JobDefinition.$": "$.lts_job_job_definition",
},
"Next": "Upload Start"
}
Run Code Online (Sandbox Code Playgroud)
请注意,我正在尝试使用 $. JSONpath 语法,以便动态地通过步骤传递参数。
当给出以下输入时
"lts_job_container_overrides": {
"environment": [
{
"name": "MY_ENV_VARIABLE",
"value": "XYZ"
},
],
"command": [
"/app/file_copy.py"
]
},
"lts_job_job_definition": "MyBatchJobDefinition"
Run Code Online (Sandbox Code Playgroud)
我预计环境和命令值将传递给 AWS Batch 中的相应参数 (ContainerOverrides)。相反,AWS Steps 似乎试图将它们提升为顶级参数 - 然后抱怨它们无效。
{
"error": "States.Runtime",
"cause": "An error occurred while executing the …Run Code Online (Sandbox Code Playgroud) 在AWS国语言规范描述的角色InputPath和Parameters领域,但并没有给被一起使用的过滤器的一个例子。
我的理解是,如果指定,则InputPath字段给出的 JSON 路径将应用于生成有效输入的原始输入。然后,如果指定,则应用参数字段的值,修改有效输入。
"X": {
"Type": "Task",
"Resource": "arn:aws:swf:us-east-1:123456789012:task:X",
"Next": "Y",
"InputPath": "$.sub",
"Parameters": {
"flagged": true,
"parts": {
"first.$": "$.vals[0]",
"last3.$": "$.vals[3:]"
}
}
}
Run Code Online (Sandbox Code Playgroud)
然后,给定以下输入:
{
"flagged": 7,
"sub" : {
"vals": [0, 10, 20, 30, 40, 50]
}
}
Run Code Online (Sandbox Code Playgroud)
对该Resource字段中确定的代码的有效输入是:
{
"flagged": true,
"parts": {
"first": 0,
"last3": [30, 40, 50]
}
}
Run Code Online (Sandbox Code Playgroud)
我的解释正确吗?
我在 AWS 中有一个状态机。我想限制任务(通过 lambda 创建)的并发性,以减少到我的下游 API 之一的流量。
我可以限制 lambda 并发,但任务因“Lambda.TooManyExecutions”失败而失败。有人可以分享一个简单的方法来限制 lambda 任务的并发性吗?
谢谢,维诺德。
我用一个步骤编写了一个简单的 AWS 步骤函数工作流程:
from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import Chain, TuningStep
from stepfunctions.workflow import Workflow
import train_utils
def main():
workflow_execution_role = 'arn:aws:iam::MY ARN'
execution_input = ExecutionInput(schema={
'app_id': str
})
estimator = train_utils.get_estimator()
tuner = train_utils.get_tuner(estimator)
tuning_step = TuningStep(state_id="HP Tuning", tuner=tuner, data={
'train': f's3://my-bucket/{execution_input["app_id"]}/data/'},
wait_for_completion=True,
job_name='HP-Tuning')
workflow_definition = Chain([
tuning_step
])
workflow = Workflow(
name='HP-Tuning',
definition=workflow_definition,
role=workflow_execution_role,
execution_input=execution_input
)
workflow.create()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
我的目标是从运行时提供的执行 JSON 中提取训练输入。当我执行工作流(从步骤函数控制台)时,提供 JSON{"app_id": "My App ID"}调整步骤不会获得正确的数据,而是获得stepfunctions.inputs.placeholders.ExecutionInput. 此外,在查看生成的 ASL 时,我可以看到执行输入被呈现为字符串:
... …Run Code Online (Sandbox Code Playgroud) python machine-learning state-machine hyperparameters aws-step-functions
我创建了一个包含 4 个不同活动的步骤函数,这些活动依次运行,并且还集成为从 java 应用程序触发此步骤函数。流程看起来像这样。
开始 -> Activity1 -> Activity2 -> Activity3 -> Activity4 -> 停止
当某个活动(例如 Activity2)期间执行失败时,该执行将被标记为失败。
现在,是否可以从先前失败的活动(Activity2)中恢复此失败的执行,而不是开始新的执行?
我经历了可能的操作AWSStepFunctions,但似乎没有一个能解决这个要求。
https://docs.aws.amazon.com/step-functions/latest/apireference/API_Operations.html
我早在 8 月份就在论坛上发布了这个问题,询问 V3 JavaScript API 何时会像 V2 SDK 中那样添加对 AWS Step Functions 的支持。我在该线程上没有听到任何消息。
是否有人拥有可供我从 V2 SDK 迁移的替代解决方案?
我想在发布 SNS 消息时执行我的步骤函数并使用它。对此最好的解决方案是什么?
我知道一种选择是使用 Lambda,订阅 SNS 主题,然后从 Lambda 内部触发 SF...我想知道是否有任何(更简单的)解决方案无需此中间步骤。
amazon-web-services amazon-sns aws-lambda aws-step-functions
背景:我正在尝试向 AWS Step Functions 中的状态机
添加步骤。DynamoDB:GetItemGetItem API 接受以下格式的输入:
{
"TableName": "MyDynamoDBTable",
"Key": {
"Column": {
"S": "MyEntry"
}
}
}
Run Code Online (Sandbox Code Playgroud)
其中“Column”是主键名称,“MyEntry”是主键值。问题是我希望能够使用 JSON 路径引用动态指定主键名称和值。
不幸的是,AWS 不允许我传递主键名称(“Column”)的值引用。所以我不能做类似的事情
{
"TableName": "MyDynamoDBTable",
"Key.$": {
"$.ColumnName": {
"S": "MyEntry"
}
}
}
Run Code Online (Sandbox Code Playgroud)
问题:
我能想到的唯一解决方法(虽然有点难看)是使用States.StringToJson和States.Format内部函数的组合首先生成字段输入的字符串化版本Key.$,然后从字符串转换为 JSON。就像是:
{
"TableName.$": "$.TableName",
"Key.$": "States.StringToJson(States.Format('\{\"{}\":\{\"S.$\":\"{}\"\}\}', $.PrimaryKeyName, $.PrimaryKeyValue))"
}
Run Code Online (Sandbox Code Playgroud)
理论上它应该有效,但 AWS Step Functions 似乎对转义双引号不满意?它无法解析上面的定义。
所以我的问题是:
有没有办法让这项工作发挥作用?(通过某种方式转义双引号,或者通过完全不同的方法)