小编Yan*_*iva的帖子

Python3 RabbitMQ Pika ConnectionResetError 仅在发布者上

我遇到的问题仅发生在我的 Pika 发行商端。

项目架构:

- Kubernetes cluster:
    - Flask Server Pod:
        - publisher running
    - Consumer Pod:
        - Consumer Running
Run Code Online (Sandbox Code Playgroud)

发布者正在运行一个 BlockingConnection 类以供单线程使用。消费者正在运行 SelectConnection 类以供多线程使用。

每次我收到对我的烧瓶服务器的请求时,我都会进行一些数据处理,然后将消息发布到我的相关队列

例如,这就是我发布到第一个队列的方式:

def publish_queue_1_message(self, message):
    with self.QUEUES[self.QUEUE_1]["lock"]:
        self._publish_message(self.QUEUE_1, message)

@pika_publish_decorator
def _publish_message(self, queue, message):
    if self.QUEUES[queue]["channel"] is None or not self.QUEUES[queue]["channel"].is_open:
        raise RabbitNoChannelException

    self.QUEUES[queue]["channel"].basic_publish(
        self.EXCHANGE,
        self.QUEUES[queue]["routing_key"],
        message
    )
Run Code Online (Sandbox Code Playgroud)

注意: self.QUEUES 是一个字典,保存我在创建通道和线程锁后分配的通道。我使用锁来锁定通道,一次只发布一条消息,以防我从 Flask 服务器收到大量请求

@pika_publish_decorator 是一个装饰器,它处理如果出现任何错误,则尝试重新创建连接和通道并尝试再次运行该函数的情况。

由于某种原因,我从 pika.adapters.utils.io_services_utils 记录器多次收到此错误。

_AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=17, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.240.0.165', 47600)>; Caller's …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq python-3.x pika

5
推荐指数
0
解决办法
483
查看次数

标签 统计

pika ×1

python ×1

python-3.x ×1

rabbitmq ×1