我认为这里的理想解决方案是使用basic_get方法.它将获取单个消息,但如果队列已经为空,则将返回None
.这样做的好处是你可以通过一个简单的循环清除队列,然后只需None
返回一次循环,加上运行basic_get与多个使用者是安全的.
这个例子基于我自己的库; amqpstorm,但你也可以轻松实现与pika相同.
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
result = channel.basic.get(queue='simple_queue', no_ack=False)
if not result:
print("Channel Empty.")
# We are done, lets break the loop and stop the application.
break
print("Message:", result['body'])
channel.basic.ack(result['method']['delivery_tag'])
channel.close()
connection.close()
Run Code Online (Sandbox Code Playgroud)
这对你有用吗:
N = queue.method.message_count
N
处理完毕后立即调用channel.stop_consuming
。所以,客户端代码将是这样的:
class CountCallback(object):
def __init__(self, count):
self.count = count
def __call__(self, ch, method, properties, body):
# process the message here
self.count -= 1
if not self.count:
ch.stop_consuming()
channel = conn.channel()
queue = channel.queue_declare('tasks')
callback = CountCallback(queue.method.message_count)
channel.basic_consume(callback, queue='tasks')
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)