Python3 RabbitMQ Pika ConnectionResetError 仅在发布者上

Yan*_*iva 5 python rabbitmq python-3.x pika

我遇到的问题仅发生在我的 Pika 发行商端。

项目架构:

- Kubernetes cluster:
    - Flask Server Pod:
        - publisher running
    - Consumer Pod:
        - Consumer Running
Run Code Online (Sandbox Code Playgroud)

发布者正在运行一个 BlockingConnection 类以供单线程使用。消费者正在运行 SelectConnection 类以供多线程使用。

每次我收到对我的烧瓶服务器的请求时,我都会进行一些数据处理,然后将消息发布到我的相关队列

例如,这就是我发布到第一个队列的方式:

def publish_queue_1_message(self, message):
    with self.QUEUES[self.QUEUE_1]["lock"]:
        self._publish_message(self.QUEUE_1, message)

@pika_publish_decorator
def _publish_message(self, queue, message):
    if self.QUEUES[queue]["channel"] is None or not self.QUEUES[queue]["channel"].is_open:
        raise RabbitNoChannelException

    self.QUEUES[queue]["channel"].basic_publish(
        self.EXCHANGE,
        self.QUEUES[queue]["routing_key"],
        message
    )
Run Code Online (Sandbox Code Playgroud)

注意: self.QUEUES 是一个字典,保存我在创建通道和线程锁后分配的通道。我使用锁来锁定通道,一次只发布一条消息,以防我从 Flask 服务器收到大量请求

@pika_publish_decorator 是一个装饰器,它处理如果出现任何错误,则尝试重新创建连接和通道并尝试再次运行该函数的情况。

由于某种原因,我从 pika.adapters.utils.io_services_utils 记录器多次收到此错误。

_AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=17, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.240.0.165', 47600)>; Caller's stack:
Traceback (most recent call last):

  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 1103, in _on_socket_writable
    self._produce()

  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 819, in _produce
    num_bytes_sent = self._sigint_safe_send(self._sock,

  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap
    return func(*args, **kwargs)

  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send
    return sock.send(data)

ConnectionResetError: [Errno 104] Connection reset by peer
Run Code Online (Sandbox Code Playgroud)

如果你能帮我解决这个问题,那就太棒了!非常感谢!