使用Kombu ConsumerMixin,如何声明多个绑定?

smi*_*lli 4 python amqp rabbitmq kombu

我有一个名为RabbitMQ主题的交换experiment.我正在建立一个消费者,我希望收到路由密钥以"foo"开头的所有消息以及路由密钥以"bar"开头的所有消息.

根据RabbitMQ文档,并基于我自己在管理UI中的实验,应该可以有一个交换,一个队列和两个连接它们的绑定(foo.#bar.#).

我无法弄清楚如何使用Kombu的ConsumerMixin来表达这一点.我觉得我应该能做到:

q = Queue(exchange=exchange, routing_key=['foo.#', 'bar.#'])
Run Code Online (Sandbox Code Playgroud)

......但它完全不喜欢这样.我也尝试过:

q.bind_to(exchange=exchange, routing_key='foo.#')
q.bind_to(exchange=exchange, routing_key='bar.#')
Run Code Online (Sandbox Code Playgroud)

...但每次我尝试我得到:

kombu.exceptions.NotBoundError: Can't call method on Queue not bound to a channel
Run Code Online (Sandbox Code Playgroud)

...我猜鬃毛感觉到了.但是我无法在mixin的界面中看到一个位置,一旦绑定到通道,我就可以轻松地挂钩到队列.这是基础(工作)代码:

from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin


class Worker(ConsumerMixin):
    exchange = Exchange('experiment', type='topic')
    q = Queue(exchange=exchange, routing_key='foo.#', exclusive=True)

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[self.q], callbacks=[self.on_task])]

    def on_task(self, body, message):
        print body
        message.ack()


if __name__ == '__main__':
    with Connection('amqp://guest:guest@localhost:5672//') as conn:
        worker = Worker(conn)
        worker.run()
Run Code Online (Sandbox Code Playgroud)

......哪个有效,但只给我foo留言.除了为我感兴趣的每个路由密钥创建一个新的Queue并将它们全部传递给Consumer之外,还有一种干净的方法吗?

smi*_*lli 7

在挖掘了一点之后,我找到了一种方法来实现这一点,这与我的第一个想法非常接近.不是将routing_key字符串传递给队列,而是传递bindings列表.列表中的每个元素都是binding指定交换和路由键的对象的实例.

一个例子胜过千言万语:

from kombu import Exchange, Queue, binding

exchange = Exchange('experiment', type='topic')
q = Queue(exchange=exchange, bindings=[
    binding(exchange, routing_key='foo.#'),
    binding(exchange, routing_key='bar.#')
], exclusive=True)
Run Code Online (Sandbox Code Playgroud)

而且效果很棒!

  • 您不需要为“队列”提供“交换”参数,因为您提供的是“绑定”参数? (2认同)