轮询AWS SQS队列并从队列中删除收到的消息的最佳实践?

Mo.*_*Mo. 15 python boto amazon-sqs amazon-web-services

我有一个SQS队列,一直由数据使用者填充,我现在正在尝试使用Python的boto创建从SQS获取此数据的服务.

我设计它的方式是,我将有10-20个线程都尝试从SQS队列中读取消息,然后在返回队列以获取下一批之前执行他们必须对数据(业务逻辑)执行的操作数据一旦完成.如果没有数据,他们将等待一些数据可用.

我有两个方面我不确定这个设计

  1. 是否使用long time_out值调用receive_message()并且如果在20秒内没有返回任何内容(允许的最大值),那么只需重试?或者是否存在仅在数据可用时返回的阻止方法?
  2. 我注意到,一旦收到消息,它就不会从队列中删除,我是否必须收到消息,然后在收到消息后再发送另一个请求将其从队列中删除?看起来有点矫枉过正.

谢谢

gar*_*aat 16

该方法的长轮询功能receive_message()是轮询SQS的最有效方法.如果返回没有任何消息,我会建议在重试之前短暂延迟,特别是如果你有多个读者.您甚至可能希望执行增量延迟,以便每个后续的空读取等待更长时间,这样您就不会受到AWS的限制.

是的,您必须在阅读后删除该消息,否则它将重新出现在队列中.在工作人员阅读消息然后在完全处理消息之前失败的情况下,这实际上非常有用.在这种情况下,它将被另一名工人重新排队并读取.您还需要确保将消息的不可见性超时设置为足够长,以使工作人员有足够的时间处理消息,然后再自动重新出现在队列中.如果需要,如果需要的时间比预期的长,您的工作人员可以在处理时调整超时.


yge*_*her 6

如果您想要一种简单的方法来设置一个侦听器,该侦听器包括在处理完成后自动删除消息,并自动将异常推送到指定队列,则可以使用pySqsListener包.

您可以像这样设置一个监听器:

from sqs_listener import SqsListener

class MyListener(SqsListener):
    def handle_message(self, body, attributes, messages_attributes):
        run_my_function(body['param1'], body['param2']

listener = MyListener('my-message-queue', 'my-error-queue')
listener.listen()
Run Code Online (Sandbox Code Playgroud)

目前该软件包使用短轮询,但我正在考虑使轮询类型可配置.

免责声明:我是该套餐的作者.


小智 5

另一个选择是使用AWS Beanstalk设置工作程序应用程序,如本博客文章中所述.

您的烧瓶应用程序将接收消息作为HTTP帖子中的json对象,而不是使用boto3进行长轮询.所设置的HTTP路径和消息类型可在AWS Elastic Beanstalk配置选项卡中配置:

在此输入图像描述

AWS Elastic Beanstalk具有额外的好处,即能够根据SQS队列的大小及其部署管理优势动态扩展工作器数量.

是我发现作为模板有用的示例应用程序.