RabbitMQ非阻塞消费者

Hug*_*usa 12 python queue parallel-processing asynchronous rabbitmq

我在Python中使用RabbitMQ来管理生产者和多个消费者之间的几个队列.在RabbitMQ网站(路由模型)的示例中,消费者被阻止.这意味着它们会在start_consuming()上停止并在每次队列中有新的"任务"时执行回调函数.

我的问题是:如何以他仍在等待任务的方式实现我的消费者(因此,每次队列中有新的东西时都会调用回调函数)但同时他可以执行其他工作/代码.

谢谢

adn*_*leb 5

表格常见问题

Pika 在代码中没有任何线程的概念。如果您想将 Pika 与线程一起使用,请确保每个线程都有一个 Pika 连接,并在该线程中创建。跨线程共享一个 Pika 连接是不安全的,

所以让我们在线程内部创建连接:

import pika


class PikaMassenger():

    exchange_name = '...'

    def __init__(self, *args, **kwargs):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.conn.channel()
        self.channel.exchange_declare(
            exchange=self.exchange_name, 
            exchange_type='topic')

    def consume(self, keys, callback):
        result = self.channel.queue_declare('', exclusive=True)
        queue_name = result.method.queue
        for key in keys:
            self.channel.queue_bind(
                exchange=self.exchange_name, 
                queue=queue_name, 
                routing_key=key)

        self.channel.basic_consume(
            queue=queue_name, 
            on_message_callback=callback, 
            auto_ack=True)

        self.channel.start_consuming()


    def __enter__(self):
        return self


    def __exit__(self, exc_type, exc_value, traceback):
        self.conn.close()

def start_consumer():

    def callback(ch, method, properties, body):
        print(" [x] %r:%r consumed" % (method.routing_key, body))

    with PikaMassenger() as consumer:
        consumer.consume(keys=[...], callback=callback)


consumer_thread = threading.Thread(target=start_consumer)
consumer_thread.start()
Run Code Online (Sandbox Code Playgroud)


小智 -4

对于接收器

import pika

messages = []
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='message')

def callback(ch, method, properties, message):
    print(message)
    messages.append(message)

channel.basic_consume(callback,queue='message',no_ack=True)
Run Code Online (Sandbox Code Playgroud)

channel.basic_consume(callback,queue='message',no_ack=True)
Run Code Online (Sandbox Code Playgroud)

当你需要时) 或在线程中

import threading

import pika
import time

messages = []

def recieve_messages():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        messages.append(body)

    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
    # channel.start_consuming()
    mq_recieve_thread = threading.Thread(target=channel.start_consuming)
    mq_recieve_thread.start()

recieve_messages()
while True:
    print(messages)
    time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

  • Pika 不是线程安全的 - API 不支持此功能 (2认同)
  • Robben_Ford 是对的,人们不应该将线程与鼠兔一起使用。“Pika 在代码中没有任何线程概念。如果您想将 Pika 与线程一起使用,请确保每个线程都有一个在该线程中创建的 Pika 连接。跨线程共享一个 Pika 连接是不安全的。” -> 来自 pika 文档常见问题解答 (2认同)