Pika + RabbitMQ:将basic_qos设置为prefetch = 1仍然会消耗队列中的所有消息

gro*_*wse 16 qos rabbitmq pika

我有一个python worker客户端,它会旋转10个worker,每个worker挂钩到RabbitMQ队列.有点像这样:

#!/usr/bin/python
worker_count=10

def mqworker(queue, configurer):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
    channel = connection.channel()
    channel.queue_declare(queue=qname, durable=True)
    channel.basic_consume(callback,queue=qname,no_ack=False)
    channel.basic_qos(prefetch_count=1)
    channel.start_consuming()


def callback(ch, method, properties, body):
    doSomeWork();
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == '__main__':
    for i in range(worker_count):
        worker = multiprocessing.Process(target=mqworker)
        worker.start()
Run Code Online (Sandbox Code Playgroud)

我遇到的问题是,尽管在通道上设置了basic_qos,但是第一个启动的工作人员接受队列中的所有消息,而其他人则闲置在那里.我可以在rabbitmq界面中看到这一点,即使我设置worker_count为1并在队列中转储50条消息,所有50条消息进入"未确认"桶,而我预计1会成为未确认的,而另外49条将成为准备.

为什么这不起作用?

gro*_*wse 18

我似乎通过移动到哪里来解决这个问题basic_qos.

在它之后放置它channel = connection.channel()似乎改变了我所期望的行为.

  • 我认为我们应该在`basic_consume`之前声明`basic_qos`.因为basic_consume在初始化时使用此设置. (5认同)
  • 同意@rborodinov。我在“basic_consume”之后立即使用了“basic_qos”,但它不起作用。切换了它们,现在可以正常工作了。 (2认同)