允许 RabbitMQ 和 Pika 保持连接始终打开

Avi*_*ión 4 python rabbitmq pika rabbitmq-exchange

我有一个 Python 脚本,它从流中读取内容,当读取新字符串时,它将其内容(字符串)推送到 RabbitMQ 队列。

问题是流可能不会在 1、2 或 9 小时左右发送消息,所以我希望RabbitMQ 连接始终打开

问题是当我创建连接和通道时:

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials))
channel = self.connection.channel()
channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')
Run Code Online (Sandbox Code Playgroud)

...如果一个小时后收到一条消息,我会收到此错误:

  File "/usr/local/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/var/opt/rabbitmq-agent.py", line 34, in push_to_queue
    raise Exception("Error sending the message to the queue: " + format(e))
Exception: Error sending the message to the queue: Send message to publisher error: Channel allocation requires an open connection: <SelectConnection CLOSED socket=None params=<ConnectionParameters host=x port=xvirtual_host=/ ssl=False>>
Run Code Online (Sandbox Code Playgroud)

我认为rabbitmq服务器和客户端之间的连接已关闭。

我怎样才能避免这种情况?我想要一句“请永远保持联系”。也许在Pika的连接参数中设置一个超级大的心跳?像这样的东西:

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials, heartbeat=6000))
Run Code Online (Sandbox Code Playgroud)

任何其他冷却器解决方案将受到高度赞赏。

提前致谢

Ton*_*vić 8

我建议您每次发送消息之前检查连接,如果连接关闭,则只需重新连接即可。

if not self.connection or self.connection.is_closed:
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials))
    channel = self.connection.channel()
    channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')
Run Code Online (Sandbox Code Playgroud)

  • AFAIK,self.connection.is_closure 将保持 False,即使连接断开,直到对rabbitmq进行 pika 调用。请参阅 - https://github.com/pika/pika/issues/877 (3认同)