在python/pika中使用多个队列

use*_*878 31 python rabbitmq pika

我正在尝试创建一个订阅多个队列的消费者,然后在消息到达时处理它们.

问题是当第一个队列中已经存在某些数据时,它会占用第一个队列,而不会消耗第二个队列.但是,当第一个队列为空时,它会转到下一个队列,然后同时消耗这两个队列.

我首先实现了线程,但是想要避开它,当pika库为我做的时候没有太多的复杂性.以下是我的代码:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

Chi*_*and 16

一种可能的解决方案是使用非阻塞连接并使用消息.

import pika


def callback(channel, method, properties, body):
    print(body)
    channel.basic_ack(delivery_tag=method.delivery_tag)


def on_open(connection):
    connection.channel(on_channel_open)


def on_channel_open(channel):
    channel.basic_consume(callback, queue='queue1')
    channel.basic_consume(callback, queue='queue2')


parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()
Run Code Online (Sandbox Code Playgroud)

这将连接到多个队列,并相应地使用消息.

  • @RápliAndrás连接到rabbitmq时,需要指定virtualhost。默认主机为`/`,转义为`%2f`。 (2认同)

Gav*_*Roy 2

该问题很可能是因为第一个调用已发出 Basic.Consume,并且在发出第二个调用之前已从预填充队列接收到消息。您可能想要尝试将 QoS 预取计数设置为 1,这将限制 RabbitMQ 一次向您发送多条消息。