如何重新连接RabbitMQ?

Ala*_*ano 13 python rabbitmq pika

我的python脚本一旦从另一个数据源收到消息,就不得不向RabbitMQ发送消息.python脚本发送它们的频率可以变化,例如1分钟到30分钟.

以下是我与RabbitMQ建立连接的方法:

  rabt_conn = pika.BlockingConnection(pika.ConnectionParameters("some_host"))
  channel = rbt_conn.channel()
Run Code Online (Sandbox Code Playgroud)

我刚才有例外

pika.exceptions.ConnectionClosed
Run Code Online (Sandbox Code Playgroud)

我该如何重新连接呢?什么是最好的方式?有没有"策略"?是否有能力发送ping以保持连接活动或设置超时?

任何指针将不胜感激.

el.*_*omo 13

RabbitMQ使用心跳来检测和关闭"死"连接,并防止网络设备(防火墙等)终止"空闲"连接.从版本3.5.5开始,默认超时设置为60秒(之前为~10分钟).来自文档:

每次超时/ 2秒发送心跳帧.在两次错过心跳后,对等体被认为无法访问.

与鼠兔的问题BlockingConnection是,它无法对心跳回应,直到一些API调用时(例如channel.basic_publish(),connection.sleep()等).

到目前为止我找到的方法:

增加或停用超时

RabbitMQ在建立连接时与客户端协商超时.从理论上讲,应该可以使用heartbeat_interval参数覆盖较大的服务器默认值,但当前的Pika版本(0.10.0)使用服务器和客户端提供的最小值.此问题已在当前主服务器上修复.

另一方面,可以通过将heartbeat_interval参数设置为完全停用心跳功能0,这可能会引发新问题(防火墙丢弃连接等)

重新连接

扩展@theafire的答案,您可以编写自己的发布者类,让您在需要时重新连接.一个简单的实现示例:

import logging
import json
import pika

class Publisher:
    EXCHANGE='my_exchange'
    TYPE='topic'
    ROUTING_KEY = 'some_routing_key'

    def __init__(self, host, virtual_host, username, password):
        self._params = pika.connection.ConnectionParameters(
            host=host,
            virtual_host=virtual_host,
            credentials=pika.credentials.PlainCredentials(username, password))
        self._conn = None
        self._channel = None

    def connect(self):
        if not self._conn or self._conn.is_closed:
            self._conn = pika.BlockingConnection(self._params)
            self._channel = self._conn.channel()
            self._channel.exchange_declare(exchange=self.EXCHANGE,
                                           type=self.TYPE)

    def _publish(self, msg):
        self._channel.basic_publish(exchange=self.EXCHANGE,
                                    routing_key=self.ROUTING_KEY,
                                    body=json.dumps(msg).encode())
        logging.debug('message sent: %s', msg)

    def publish(self, msg):
        """Publish msg, reconnecting if necessary."""

        try:
            self._publish(msg)
        except pika.exceptions.ConnectionClosed:
            logging.debug('reconnecting to queue')
            self.connect()
            self._publish(msg)

    def close(self):
        if self._conn and self._conn.is_open:
            logging.debug('closing queue connection')
            self._conn.close()
Run Code Online (Sandbox Code Playgroud)

其他可能性

我尚未探索的其他可能性:


its*_*ire 7

死简单:这样的一些模式.

import time

while True:
    try:
        communication_handles = connect_pika()
        do_your_stuff(communication_handles)
    except pika.exceptions.ConnectionClosed:
        print 'oops. lost connection. trying to reconnect.'
        # avoid rapid reconnection on longer RMQ server outage
        time.sleep(0.5) 
Run Code Online (Sandbox Code Playgroud)

你可能不得不重新考虑你的代码,但基本上它是关于捕获异常,缓解问题并继续做你的东西.将communication_handles包含所有的鼠兔元素,如渠道,队列和任何你需要的东西通过鼠兔与RabbitMQ的沟通.

  • 很好,但是这个解决方案将进入最大递归深度,以防服务器离开更长时间. (3认同)