Python和RabbitMQ - 从多个渠道收听消费事件的最佳方式?

bli*_*ile 10 python rabbitmq pika

我有两个独立的RabbitMQ实例.我正试图找到听取两者事件的最佳方式.

例如,我可以使用以下内容在一个上使用事件:

credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

我有第二个主持人,"host2",我也想听.我想创建两个单独的线程来做到这一点,但从我读过的,pika不是线程安全的.有没有更好的办法?或者创建两个单独的线程,每个线程监听不同的Rabbit实例(host1和host2)就足够了?

Uni*_*t03 36

"什么是最好的方式"的答案在很大程度上取决于您的队列使用模式以及"最佳"的含义.由于我还没有对问题发表评论,我将尝试提出一些可能的解决方案.

在每个例子中,我将假设已经宣布交换.

主题

您可以使用单个进程在单独的主机上使用来自两个队列的消息pika.

你是对的 - 正如它自己的FAQ所说,pika它不是线程安全的,但它可以通过创建每个线程的RabbitMQ主机连接以多线程方式使用.使用threading模块在线程中运行此示例如下所示:

import pika
import threading


class ConsumerThread(threading.Thread):
    def __init__(self, host, *args, **kwargs):
        super(ConsumerThread, self).__init__(*args, **kwargs)

        self._host = host

    # Not necessarily a method.
    def callback_func(self, channel, method, properties, body):
        print("{} received '{}'".format(self.name, body))

    def run(self):
        credentials = pika.PlainCredentials("guest", "guest")

        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self._host,
                                      credentials=credentials))

        channel = connection.channel()

        result = channel.queue_declare(exclusive=True)

        channel.queue_bind(result.method.queue,
                           exchange="my-exchange",
                           routing_key="*.*.*.*.*")

        channel.basic_consume(self.callback_func,
                              result.method.queue,
                              no_ack=True)

        channel.start_consuming()


if __name__ == "__main__":
    threads = [ConsumerThread("host1"), ConsumerThread("host2")]
    for thread in threads:
        thread.start()
Run Code Online (Sandbox Code Playgroud)

我已经声明callback_func了一种纯粹ConsumerThread.name在打印邮件正文时使用的方法.它也可能是课外的一种功能ConsumerThread.

流程

或者,您始终可以使用每个要使用事件的队列的使用者代码运行一个进程.

import pika
import sys


def callback_func(channel, method, properties, body):
    print(body)


if __name__ == "__main__":
    credentials = pika.PlainCredentials("guest", "guest")

    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=sys.argv[1],
                                  credentials=credentials))

    channel = connection.channel()

    result = channel.queue_declare(exclusive=True)

    channel.queue_bind(result.method.queue,
                       exchange="my-exchange",
                       routing_key="*.*.*.*.*")

    channel.basic_consume(callback_func, result.method.queue, no_ack=True)

    channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

然后运行:

$ python single_consume.py host1
$ python single_consume.py host2  # e.g. on another console
Run Code Online (Sandbox Code Playgroud)

如果您对来自队列的消息所做的工作占用大量CPU,并且只要CPU中的核心数量> =消费者数量,通常最好使用此方法 - 除非您的队列在大多数情况下都是空的,消费者不会利用这个CPU时间*.

异步

另一种选择是涉及一些异步框架(例如Twisted)并在单线程中运行整个事物.

您不能再BlockingConnection在异步代码中使用; 幸运的是,pika有适配器Twisted:

from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log


class Consumer(object):
    def on_connected(self, connection):
        d = connection.channel()
        d.addCallback(self.got_channel)
        d.addCallback(self.queue_declared)
        d.addCallback(self.queue_bound)
        d.addCallback(self.handle_deliveries)
        d.addErrback(log.err)

    def got_channel(self, channel):
        self.channel = channel

        return self.channel.queue_declare(exclusive=True)

    def queue_declared(self, queue):
        self._queue_name = queue.method.queue

        self.channel.queue_bind(queue=self._queue_name,
                                exchange="my-exchange",
                                routing_key="*.*.*.*.*")

    def queue_bound(self, ignored):
        return self.channel.basic_consume(queue=self._queue_name)

    def handle_deliveries(self, queue_and_consumer_tag):
        queue, consumer_tag = queue_and_consumer_tag
        self.looping_call = task.LoopingCall(self.consume_from_queue, queue)

        return self.looping_call.start(0)

    def consume_from_queue(self, queue):
        d = queue.get()

        return d.addCallback(lambda result: self.handle_payload(*result))

    def handle_payload(self, channel, method, properties, body):
        print(body)


if __name__ == "__main__":
    consumer1 = Consumer()
    consumer2 = Consumer()

    parameters = ConnectionParameters()
    cc = protocol.ClientCreator(reactor,
                                TwistedProtocolConnection,
                                parameters)
    d1 = cc.connectTCP("host1", 5672)
    d1.addCallback(lambda protocol: protocol.ready)
    d1.addCallback(consumer1.on_connected)
    d1.addErrback(log.err)

    d2 = cc.connectTCP("host2", 5672)
    d2.addCallback(lambda protocol: protocol.ready)
    d2.addCallback(consumer2.on_connected)
    d2.addErrback(log.err)

    reactor.run()
Run Code Online (Sandbox Code Playgroud)

这种方法会更好,消耗的队列越多,消费者执行的工作就越少.*.

Python 3

既然你已经提到过pika,我已经限制自己使用基于Python 2.x的解决方案了,因为pika还没有移植过.

但是如果你想要移动到> = 3.3,一个可能的选择是使用asyncioAMQP协议之一(你用RabbitMQ说的协议),例如asynqpaioamqp.

* - 请注意,这些是非常浅的提示 - 在大多数情况下,选择并不那么明显; 什么对你最好的取决于队列"饱和度"(消息/时间),你收到这些消息后做了什么工作,你的消费者在哪个环境等; 除了对所有实现进行基准测试之外,没有办法确定