Pika:如何同步使用消息

Ale*_*lin 4 python amqp rabbitmq pika

我想定期运行一个进程(比如每10分钟一次,或者每小时一次),从队列中获取所有消息,处理它们然后退出.有没有办法用pika或者我应该使用不同的python库?

ean*_*son 8

我认为这里的理想解决方案是使用basic_get方法.它将获取单个消息,但如果队列已经为空,则将返回None.这样做的好处是你可以通过一个简单的循环清除队列,然后只需None返回一次循环,加上运行basic_get与多个使用者是安全的.

这个例子基于我自己的库; amqpstorm,但你也可以轻松实现与pika相同.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
    result = channel.basic.get(queue='simple_queue', no_ack=False)
    if not result:
        print("Channel Empty.")
        # We are done, lets break the loop and stop the application.
        break
    print("Message:", result['body'])
    channel.basic.ack(result['method']['delivery_tag'])
channel.close()
connection.close()
Run Code Online (Sandbox Code Playgroud)


ber*_*eal 3

这对你有用吗:

  1. 测量当前队列长度为N = queue.method.message_count
  2. 让回调对已处理的消息进行计数,并在N处理完毕后立即调用channel.stop_consuming

所以,客户端代码将是这样的:

class CountCallback(object):
    def __init__(self, count):
        self.count = count

    def __call__(self, ch, method, properties, body):
        # process the message here
        self.count -= 1
        if not self.count:
            ch.stop_consuming()

channel = conn.channel()
queue = channel.queue_declare('tasks')
callback = CountCallback(queue.method.message_count)
channel.basic_consume(callback, queue='tasks')
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)