如何使用Python中的boto库获取Amazon SQS队列中的所有消息?

Cha*_*guy 12 python boto amazon-sqs

我正在开发一个应用程序,其工作流程通过使用boto在SQS中传递消息来管理.

我的SQS队列逐渐增长,我无法检查它应该包含多少元素.

现在我有一个守护进程定期轮询队列,并检查我是否有一组固定大小的元素.例如,考虑以下"队列":

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"]
Run Code Online (Sandbox Code Playgroud)

现在我要检查,如果我在队列一起在某个时间点"msg1_comp1","msg2_comp1"和"msg3_comp1",但我不知道队列的大小.

查看API后,您似乎只能获得1个元素或队列中固定数量的元素,但不是全部:

>>> rs = q.get_messages()
>>> len(rs)
1
>>> rs = q.get_messages(10)
>>> len(rs)
10
Run Code Online (Sandbox Code Playgroud)

答案中提出的建议是在循环中获取10个消息,直到我什么也得不回来,但SQS中的消息具有可见性超时,这意味着如果我从队列中轮询元素,它们将不会被真正删除,它们只会在短时间内隐形.

是否有一种简单的方法来获取队列中的所有消息,而不知道有多少消息?

AJ.*_*AJ. 17

将您的呼叫置于q.get_messages(n)内部循环中:

all_messages=[]
rs=q.get_messages(10)
while len(rs)>0:
    all_messages.extend(rs)
    rs=q.get_messages(10)
Run Code Online (Sandbox Code Playgroud)

此外,dump也不会支持超过10条消息:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
    """Utility function to dump the messages in a queue to a file
    NOTE: Page size must be < 10 else SQS errors"""
Run Code Online (Sandbox Code Playgroud)

  • @linker-顺便说一句,消息数量仅应为1到10。如果您使用其他方法,则SQS服务应返回“ ReadCountOutOfRange”错误。 (2认同)

小智 15

我一直在使用AWS SQS队列来提供即时通知,因此我需要实时处理所有消息.以下代码将帮助您有效地将(所有)消息出列并在删除时处理任何错误.

注意:要从队列中删除消息,您需要删除它们.我正在使用更新的boto3 AWS python SDK,json库和以下默认值:

import boto3
import json

region_name = 'us-east-1'
queue_name = 'example-queue-12345'
max_queue_messages = 10
message_bodies = []
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>'
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>'
sqs = boto3.resource('sqs', region_name=region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key)
queue = sqs.get_queue_by_name(QueueName=queue_name)
while True:
    messages_to_delete = []
    for message in queue.receive_messages(
            MaxNumberOfMessages=max_queue_messages):
        # process message body
        body = json.loads(message.body)
        message_bodies.append(body)
        # add message to delete
        messages_to_delete.append({
            'Id': message.message_id,
            'ReceiptHandle': message.receipt_handle
        })

    # if you don't receive any notifications the
    # messages_to_delete list will be empty
    if len(messages_to_delete) == 0:
        break
    # delete messages to remove them from SQS queue
    # handle any errors
    else:
        delete_response = queue.delete_messages(
                Entries=messages_to_delete)
Run Code Online (Sandbox Code Playgroud)


Jef*_*eff 6

我的理解是,SQS服务的分布式特性几乎使您的设计无法工作.每次调用get_messages时,您都会与另一组服务器通信,这些服务器将包含一些但不是全部的消息.因此,不可能"不时地检查"以设置特定的消息组是否准备就绪,然后只接受这些消息.

您需要做的是持续轮询,在到达时收集所有消息,并将它们本地存储在您自己的数据结构中.每次成功获取后,您都可以检查数据结构,以查看是否收集了一整套消息.

请记住,消息无序到达,并且一些消息被传递两次,因为删除必须传播到所有SQS服务器,但后续获取请求有时会超过删除消息.