在多个python进程之间共享RabbitMQ通道

Sha*_*han 2 python multiprocessing rabbitmq pika

我想共享BlockingChannel多个python进程。为了basic_ack从其他python进程发送 。

如何BlockingChannel在多个python进程之间共享。

以下是代码:

self.__connection__ = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.__channel__ = self.__connection__.channel()
Run Code Online (Sandbox Code Playgroud)

我尝试转储使用,pickle但它确实允许转储频道并can't pickle select.epoll objects 使用以下代码给出错误

filepath = "temp/" + "merger_channel.sav"
pickle.dump(self.__channel__, open(filepath, 'wb'))
Run Code Online (Sandbox Code Playgroud)

目标:

目标是basic_ack从其他python进程的通道发送。

nox*_*fox 5

在多个线程之间共享一个通道是一种反模式,并且您不太可能设法在进程之间共享它。

经验法则是connection每个进程1个,channel每个线程1 个。

您可以通过以下链接阅读有关此问题的更多信息:

  1. 13个常见的RabbitMQ错误
  2. RabbitMQ最佳实践
  3. SO线程对RabbitMQ和并发消耗进行了深入分析

如果要将消息使用量与多处理功能结合在一起,通常的模式是让主进程接收消息,将其有效负载传递到工作进程池中,并在完成后对其进行确认。

使用pika.BlockingChannel和的简单示例concurrent.futures.ProcessPoolExecutor

def ack_message(channel, delivery_tag, _future):
    """Called once the message has been processed.
    Acknowledge the message to RabbitMQ.
    """
    channel.basic_ack(delivery_tag=delivery_tag)

for message in channel.consume(queue='example'):
    method, properties, body = message

    future = pool.submit(process_message, body)
    # use partial to pass channel and ack_tag to callback function
    ack_message_callback = functools.partial(ack_message, channel, method.delivery_tag)
    future.add_done_callback(ack_message_callback)      
Run Code Online (Sandbox Code Playgroud)

上面的循环将无休止地消耗example队列中的消息,并将其提交到进程池。您可以通过RabbitMQ 使用者预取参数控制要同时处理的消息数量。检查pika.basic_qos以了解如何在Python中进行操作。