1 timeout amazon-sqs amazon-web-services aws-lambda
我们的流程需要 30 秒到 5 分钟不等,具体取决于事件。我们的基本架构如下(python 3.9,boto3将send_message发送到sqs):
VPC和NAT网关已配置。
我们发现,尽管函数调用已提前成功完成,但后续消息始终以 10 分钟为间隔进行处理。我们打算更有效地管理时间,并尝试将消息可见性超时默认缩短为 1 分钟,并在函数 B 执行期间每 30 秒延长 1 分钟(我们使用 boto3 client.change_message_visibility)。看起来它不起作用 - 几乎所有消息最终都会进入我们的死信队列(CloudWatch 日志没有表明任何错误)。
我们的代码:
Globals:
Function:
Timeout: 30
Environment:
Variables:
SQS_QUEUE_URL: !Ref SqsQueue
Resources:
SqsQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 600
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- DeadLetterQueue
- Arn
maxReceiveCount: 10
DeadLetterQueue:
Type: AWS::SQS::Queue
LambdaFunctionBRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: receiveAndDeleteFromQueue
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [
sqs:ReceiveMessage,
sqs:ChangeMessageVisibility,
sqs:DeleteMessage,
sqs:GetQueueAttributes
]
Resource: !GetAtt SqsQueue.Arn
- PolicyName: accessVpc
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [
logs:CreateLogGroup,
logs:CreateLogStream,
logs:PutLogEvents,
ec2:CreateNetworkInterface,
ec2:DescribeNetworkInterfaces,
ec2:DeleteNetworkInterface,
ec2:AssignPrivateIpAddresses,
ec2:UnassignPrivateIpAddresses
]
Resource: "*"
LambdaFunctionB:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/function_b/
Handler: app.lambda_handler
Runtime: python3.9
Role: !GetAtt LambdaFunctionBExecutionRole.Arn
Architectures:
- x86_64
VpcConfig:
SecurityGroupIds:
- "xxx"
SubnetIds:
- "xxx"
- "xxx"
- "xxx"
FileSystemConfigs:
- Arn: !GetAtt AccessPoint.Arn
LocalMountPath: /mnt/lambda
ReservedConcurrentExecutions: 1
Events:
SqsEvent:
Type: SQS
Properties:
Queue: !GetAtt SqsQueue.Arn
BatchSize: 1
Enabled: True
Timeout: 600
MemorySize: 512
Run Code Online (Sandbox Code Playgroud)
Lambda函数B代码app.py:
import os
import boto3
<some other imports>
sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")
def lambda_handler(event, context):
status = "ok"
record = event["Records"][0]
message = json.loads(record["body"])
receipt_handle = record["receiptHandle"]
<some business logic here>
return status
Run Code Online (Sandbox Code Playgroud)
import os
import boto3
from threading import Timer
from time import sleep
<some other imports>
sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")
class RepeatingTimer(Timer):
def run(self):
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
def increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, timeout):
sqs_client.change_message_visibility(
QueueUrl=sqs_queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=timeout
)
def lambda_handler(event, context):
status = "ok"
record = event["Records"][0]
message = json.loads(record["body"])
receipt_handle = record["receiptHandle"]
t = RepeatingTimer(30, increase_visibility_timeout, [sqs_client, sqs_queue_url, receipt_handle, 60])
t.start()
<some business logic here>
increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, 5)
t.cancel()
return status
Run Code Online (Sandbox Code Playgroud)
处理来自 Amazon SQS 队列的消息的正常流程是:
一般来说,不可见超时应该设置得足够高,以确保它仅在处理出现问题时触发。例如,如果处理一条消息通常需要 1 分钟,您可以将不可见时间设置为 4 分钟。这允许处理可能需要更长时间的异常情况,而不会在实际成功处理消息时意外地重新处理消息。
你的情况
您提到“lambda 等待 SQS 队列超时后才开始处理下一条消息”。这表明您的过程中发生了一些奇怪的事情。通常,当 Lambda 函数退出且没有错误时,Lambda 服务将使用下一条消息再次调用 Lambda 函数。消息不可见期与 Lambda 函数执行时间之间没有直接联系,也就是说,根据您的配置,当达到消息超时时,没有任何内容会“杀死”现有 Lambda 函数,也没有任何内容会终止现有 Lambda 函数。延迟下一次 Lambda 调用,直到发生消息超时。然而,当 Lambda 函数超时时,该函数将被“终止”。因此,听起来您的 Lambda 函数在完成消息处理之前就超时了。您应该将 Lambda 函数的超时设置为预期运行持续时间的几倍。(这是与 SQS 队列不可见期不同的配置。)
| 归档时间: |
|
| 查看次数: |
3369 次 |
| 最近记录: |