khi*_*ter 17 aws-lambda aws-step-functions
我有一个用例,我有一个AWS Step功能,当文件上传到S3时触发,从那里第一步运行ffprobe从外部服务获取文件的持续时间,如transloadit,其中写入输出回到S3.
我可以从该事件创建一个新的step函数,但是如果可以在原始step函数中有一个Await promise然后继续到下一个函数,我就会徘徊 - 考虑到ffprobe可能需要更长的时间才能恢复.
任何建议都非常感谢如何解决这个问题.
现在,AWS Step Functions作为第一类,支持长时间运行的步骤的异步回调。
这类似于@mixja的答案,但经过简化。工作流程中的单个状态可以直接调用Lambda,SNS,SQS或ECS并等待对的调用SendTaskSuccess。
有一个针对SQS记录的好示例,其中步进函数发送消息并暂停工作流程执行,直到提供回调为止。Lambda是等效的(假定像transloadit这样的主要处理过程发生在Lambda本身之外)
您的步进函数定义如下所示
"Invoke transloadit": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
"Parameters": {
"FunctionName": "InvokeTransloadit",
"Payload": {
"some_other_param": "...",
"token.$": "$$.Task.Token"
}
},
"Next": "NEXT_STATE"
}
Run Code Online (Sandbox Code Playgroud)
然后在您的Lambda中,您将执行以下操作
def lambda_handler(event, context):
token = event['token']
# invoke transloadit via SSM, ECS, passing token along
Run Code Online (Sandbox Code Playgroud)
那么在您长期运行的主要流程中,您将使用令牌(如aws stepfunctions send-task-success --task-token $token来自shell脚本/ CLI 的令牌)或与API调用类似的令牌发出回调。
当您向 transloadit 发送请求时,根据上传的文件密钥以可预测的密钥将步骤的 taskToken 保存在 s3 中。例如,如果媒体文件位于“s3://my-media-bucket/foobar/media-001.mp3”,您可以制作一个包含当前步骤的任务令牌的 JSON 文件,并使用相同的密钥存储它在不同的存储桶中,例如“s3://ffprobe-tasks/foobar/media-001.mp3.json”。在将媒体发送到 transloadit 的步骤结束时,不要调用该步骤的成功或失败 - 使其保持运行。
然后,当您收到 s3 通知,表明 transloadit 结果已准备就绪时,您可以确定 s3 key 来获取任务令牌('s3://ffprobe-tasks/foobar/media-001.json'),加载 JSON(并删除它来自 s3) 并发送该任务的成功。步骤函数将继续执行到下一个状态。
无法提出简单的解决方案,只能探索几个方向。
首先,Step Functions 有一种特定的方法来处理长时间运行的后台工作:活动。https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html它基本上是一个队列。
如果您想要 100% 无服务器,这将变得复杂或丑陋。
Retry或者,使用自定义错误代码和子句在状态机中进行 S3 轮询循环如果您可以为后台工作人员分配“1/8 micro”实例,这并不优雅,但很容易,并且可以通过即时反应来实现。低硬件要求暗示我们将仅使用机器进行同步。
定义 StepFunction 活动,例如命名为video-duration。定义 SQS 队列以进行即时反应或轮询 S3 以获取持续时间结果。
状态函数伪代码:
{
StartAt: ffprobe
ffprobe: {
Type: Task
Resource: arn:...lambda:launch-ffprobe
Next: wait-duration
}
wait-duration: {
Type: Task
Resource: arn...activity:video-duration
End: true
}
}
Run Code Online (Sandbox Code Playgroud)
后台工作者伪代码:
statemap = dict/map filename to result
thread1:
loop:
taskToken, input = SF.GetActivityTask('video-duration') # long poll
sync(key=input.filename, waiter=taskToken)
thread2:
loop:
msg = SQS.ReceiveMessage(...) # or poll S3
sync(key=msg.filename, duration=msg.result)
function sync(key, waiter, duration):
state = statemap[key]
if waiter:
state.waiter = waiter
if duration:
state.duration = duration
if state.waiter and state.duration:
SF.SendTaskSuccess(state.waiter, state.duration)
Run Code Online (Sandbox Code Playgroud)
S3触发伪代码:
if filename is video:
SF.StartExecution(...)
else if filename is duration:
content = S3.GetObject(filename)
SQS.SendMessage(queue, content)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10222 次 |
| 最近记录: |