AWS步骤功能 - 等到事件发生

khi*_*ter 17 aws-lambda aws-step-functions

我有一个用例,我有一个AWS Step功能,当文件上传到S3时触发,从那里第一步运行ffprobe从外部服务获取文件的持续时间,如transloadit,其中写入输出回到S3.

我可以从该事件创建一个新的step函数,但是如果可以在原始step函数中有一个Await promise然后继续到下一个函数,我就会徘徊 - 考虑到ffprobe可能需要更长的时间才能恢复.

任何建议都非常感谢如何解决这个问题.

wrs*_*der 6

现在,AWS Step Functions作为第一类,支持长时间运行的步骤的异步回调。

这类似于@mixja的答案,但经过简化。工作流程中的单个状态可以直接调用Lambda,SNS,SQS或ECS并等待对的调用SendTaskSuccess

有一个针对SQS记录的好示例,其中步进函数发送消息并暂停工作流程执行,直到提供回调为止。Lambda是等效的(假定像transloadit这样的主要处理过程发生在Lambda本身之外)

您的步进函数定义如下所示

"Invoke transloadit": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
  "Parameters": {
    "FunctionName": "InvokeTransloadit",
    "Payload": {
        "some_other_param": "...",
        "token.$": "$$.Task.Token"
     }
  },
  "Next": "NEXT_STATE"
}
Run Code Online (Sandbox Code Playgroud)

然后在您的Lambda中,您将执行以下操作

def lambda_handler(event, context):
    token = event['token']

    # invoke transloadit via SSM, ECS, passing token along
Run Code Online (Sandbox Code Playgroud)

那么在您长​​期运行的主要流程中,您将使用令牌(如aws stepfunctions send-task-success --task-token $token来自shell脚本/ CLI 的令牌)或与API调用类似的令牌发出回调。


ivo*_*ivo 5

当您向 transloadit 发送请求时,根据上传的文件密钥以可预测的密钥将步骤的 taskToken 保存在 s3 中。例如,如果媒体文件位于“s3://my-media-bucket/foobar/media-001.mp3”,您可以制作一个包含当前步骤的任务令牌的 JSON 文件,并使用相同的密钥存储它在不同的存储桶中,例如“s3://ffprobe-tasks/foobar/media-001.mp3.json”。在将媒体发送到 transloadit 的步骤结束时,不要调用该步骤的成功或失败 - 使其保持运行。

然后,当您收到 s3 通知,表明 transloadit 结果已准备就绪时,您可以确定 s3 key 来获取任务令牌('s3://ffprobe-tasks/foobar/media-001.json'),加载 JSON(并删除它来自 s3) 并发送该任务的成功。步骤函数将继续执行到下一个状态。


tem*_*oto 1

无法提出简单的解决方案,只能探索几个方向。

首先,Step Functions 有一种特定的方法来处理长时间运行的后台工作:活动。https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html它基本上是一个队列。

如果您想要 100% 无服务器,这将变得复杂或丑陋。

  • 正如您所说,为每个文件创建新的步骤函数
  • Retry或者,使用自定义错误代码和子句在状态机中进行 S3 轮询循环

如果您可以为后台工作人员分配“1/8 micro”实例,这并不优雅,但很容易,并且可以通过即时反应来实现。低硬件要求暗示我们将仅使用机器进行同步。

定义 StepFunction 活动,例如命名为video-duration。定义 SQS 队列以进行即时反应或轮询 S3 以获取持续时间结果。

状态函数伪代码:

{
  StartAt: ffprobe
  ffprobe: {
    Type: Task
    Resource: arn:...lambda:launch-ffprobe
    Next: wait-duration
  }
  wait-duration: {
    Type: Task
    Resource: arn...activity:video-duration
    End: true
  }
}
Run Code Online (Sandbox Code Playgroud)

后台工作者伪代码:

statemap = dict/map filename to result

thread1:
  loop:
    taskToken, input = SF.GetActivityTask('video-duration')  # long poll
    sync(key=input.filename, waiter=taskToken)
thread2:
  loop:
    msg = SQS.ReceiveMessage(...)  # or poll S3
    sync(key=msg.filename, duration=msg.result)

function sync(key, waiter, duration):
  state = statemap[key]
  if waiter:
    state.waiter = waiter
  if duration:
    state.duration = duration
  if state.waiter and state.duration:
    SF.SendTaskSuccess(state.waiter, state.duration)
Run Code Online (Sandbox Code Playgroud)

S3触发伪代码:

if filename is video:
  SF.StartExecution(...)
else if filename is duration:
  content = S3.GetObject(filename)
  SQS.SendMessage(queue, content)
Run Code Online (Sandbox Code Playgroud)