RabbitMQ破坏了管道错误或丢失了消息

Mat*_*t S 6 python amqp rabbitmq pika

使用pika库BlockingConnection连接到RabbitMQ,我偶尔会在发布消息时出错:

致命套接字错误:错误(32,'破管')

这是一个非常简单的子进程,它从内存中的队列中获取一些信息,并将一个小的JSON消息发送到AMQP.当系统几分钟没有发送任何消息时,似乎只出现错误.

建立:

connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
    exchange='xyz',
    exchange_type='fanout',
    passive=False,
    durable=True,
    auto_delete=False
)
Run Code Online (Sandbox Code Playgroud)

入队代码捕获任何连接错误并重试:

def _enqueue(self, message_id, data):
    try:
        published = self.channel.basic_publish(
            self.amqp_exchange,
            self.amqp_routing_key,
            json.dumps(data),
            pika.BasicProperties(
                content_type="application/json",
                delivery_mode=2,
                message_id=message_id
            )
        )

        # Confirm delivery or retry
        if published:
            self.retry_count = 0
        else:
            raise EnqueueException("Message publish not confirmed.")

    except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
            pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
            pika.exceptions.UnroutableError, socket.timeout) as e:
        self.retry_count += 1
        if self.retry_count < 5:
            logging.warning("Reconnecting and resending")
            if self.connection.is_open:
                self.connection.close()
            self.connect()
            self._enqueue(message_id, data)
        else:
            raise e
Run Code Online (Sandbox Code Playgroud)

这有时适用于第二次尝试.它经常挂起一段时间或者在最终抛出异常(可能是相关的错误报告)之前抛弃消息.因为它只发生在系统安静几分钟时,我猜它是由于连接超时.但AMQP有一个心跳系统,据报道pika使用它(相关的错误报告).

为什么我会收到此错误或丢失邮件,为什么连接在不使用时不会保持打开状态?

Mat*_*t S 6

来自另一个错误报告

由于 BlockingConnection 不会在后台处理心跳,并且 heartbeat_interval 无法覆盖服务器建议的心跳间隔(这也是一个错误),因此我建议默认情况下禁用心跳(改为依赖 TCP keep-alive)。

如果在消费块中处理任务需要比服务器建议的心跳间隔更长的时间,则服务器将关闭连接,并且客户端在处理完成后将无法确认消息。

v1.0.0 中的更新可能有助于解决该问题。

所以我实施了一个解决方法。每 30 秒我通过队列发布一条心跳消息。这可以保持连接打开,并具有向客户端确认我的应用程序已启动并运行的额外好处。