任何人都有使用 S3 事件触发阶跃函数的经验?

OD *_*eet 8 amazon-s3 aws-step-functions

我正在学习 step 函数,特别是,我试图弄清楚如何使用 S3 事件触发状态机执行。我正在阅读这篇文章:https : //docs.aws.amazon.com/step-functions/latest/dg/tutorial-cloudwatch-events-s3.html。该文档确实提供了有关如何配置内容的粗略指南,但我仍然不清楚以下问题:

  1. 状态机输入是什么意思?所以本文档没有过多解释输入中每个字段的含义。有谁知道在哪里可以找到它的文档?例如,此输入中的 id 字段是什么?

  2. Java lambda 如何从输入中检索有用信息?我看到了有关如何操作在状态机模式(cloudformation 或 Amazon Statemachine Lamguage)中预定义的输入的文档,但没有操作由 s3 事件自动生成的输入。

有没有人在使用状态机 + s3 事件之前构建过类似的功能?任何想法将不胜感激。

lex*_*ore 17

我们有一个类似的任务 - 通过一个 S3 事件启动 StepFunctions 状态机 - 稍作修改。我们想根据上传文件的扩展名启动不同的状态机。

最初,我们遵循了您所指的相同教程。以 StepFunctions 状态机为目标的 CloudTrail 规则。

但是后来我们意识到我们并不能真正通过文件扩展名过滤S3事件(至少我们找不到方法)。

最后,我们设法以不同的方式解决它:

  • S3 存储桶配置了通知,这些通知会触发特定 S3 对象键后缀的某些 lambda 函数(如果您愿意,则为文件扩展名)
  • Lambda 函数获取 S3 事件作为输入,根据需要对其进行转换,并使用转换后的输入启动 StepFunctions 步进机。
  • StepFunctions 状态机以 lambda 函数创建的输入启动并照常执行

与 CloudTrail 解决方案相比,这有点复杂,因为我们部署了一个额外的 lambda 函数。但是我们可以根据需要过滤 S3 事件,并且我们还可以完全控制馈送到状态机的内容。所以我认为这个解决方案比 CloudTrail 解决方案更灵活。

我现在将分享我们解决方案的一些细节。我将不得不大幅削减我们的代码,因此不能保证这会在 OOTB 中工作,但是,希望这足以让您了解这个想法。


上传桶

  UploadsInboundBucket:
    Type: 'AWS::S3::Bucket'
    Properties:
      BucketName: !Sub >-
        ${AWS::AccountId}-uploads-inbound-bucket
      NotificationConfiguration:
        LambdaConfigurations:
          - Function: !GetAtt StartVideoclipStateMachineExecutionFunction.Arn
            Event: 's3:ObjectCreated:*'
            Filter:
              S3Key:
                Rules:
                  - Name: suffix
                    Value: mp4
          - Function: !GetAtt StartVideoStateMachineExecutionFunction.Arn
            Event: 's3:ObjectCreated:*'
            Filter:
              S3Key:
                Rules:
                  - Name: suffix
                    Value: json
Run Code Online (Sandbox Code Playgroud)

s3:ObjectCreated:*触发(取决于对象键的后缀)两个 lambda 函数之一StartVideoclipStateMachineExecutionFunctionStartVideoStateMachineExecutionFunction.

此处详细描述了馈送到 lambda 函数的 S3 事件:https : //docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html

Lambda 函数只是解析输入,构建状态机输入并启动状态机。

var stepfunction = require('./stepfunction');
var aws = require('aws-sdk');
var parser = require('./parser');

exports.handler = (event, context, callback) => { 

  var statemachineArn = process.env.statemachine_arn;
  var stepfunctions = new aws.StepFunctions();
  
  stepfunction.startExecutionFromS3Event(stepfunctions, parser, statemachineArn , event);

  callback(null, event);
    
};
Run Code Online (Sandbox Code Playgroud)

解析 S3 事件:

module.exports = {
  parseEvent : function(event)
  {
    return event.Records[0].s3.bucket.arn + '/'+  event.Records[0].s3.object.key;
  }
};
Run Code Online (Sandbox Code Playgroud)

启动状态机执行:

module.exports = {
    startExecutionFromS3Event : function(stepfunctions, parser, statemachineArn , event)
    {
        //get affected S3 object from Event
        var arn = parser.parseEvent(event);

        //Create input for Step
        var input = {
            "detail" : {
                "resources" : [
                    {
                        "type": "AWS::S3::Object",
                        "ARN": arn
                    }
                ]
            }
        };
    
        //start step function execution
        var params = {
            stateMachineArn: statemachineArn,
            input: JSON.stringify(input)
        };


        stepfunctions.startExecution(params, function (err, data) {
            if (err) {
                console.log('err while executing step function')
                console.log(JSON.stringify(err));
            } else {
                console.log('started execution of step function')
            }
        });
    
    }
}
Run Code Online (Sandbox Code Playgroud)

您还需要大量 IAM 角色的权限才能完成所有这些工作(例如,必须允许 lambda 函数启动状态机),但此时我将省略它。