我遇到的问题仅发生在我的 Pika 发行商端。
项目架构:
- Kubernetes cluster:
- Flask Server Pod:
- publisher running
- Consumer Pod:
- Consumer Running
Run Code Online (Sandbox Code Playgroud)
发布者正在运行一个 BlockingConnection 类以供单线程使用。消费者正在运行 SelectConnection 类以供多线程使用。
每次我收到对我的烧瓶服务器的请求时,我都会进行一些数据处理,然后将消息发布到我的相关队列
例如,这就是我发布到第一个队列的方式:
def publish_queue_1_message(self, message):
with self.QUEUES[self.QUEUE_1]["lock"]:
self._publish_message(self.QUEUE_1, message)
@pika_publish_decorator
def _publish_message(self, queue, message):
if self.QUEUES[queue]["channel"] is None or not self.QUEUES[queue]["channel"].is_open:
raise RabbitNoChannelException
self.QUEUES[queue]["channel"].basic_publish(
self.EXCHANGE,
self.QUEUES[queue]["routing_key"],
message
)
Run Code Online (Sandbox Code Playgroud)
注意: self.QUEUES 是一个字典,保存我在创建通道和线程锁后分配的通道。我使用锁来锁定通道,一次只发布一条消息,以防我从 Flask 服务器收到大量请求
@pika_publish_decorator 是一个装饰器,它处理如果出现任何错误,则尝试重新创建连接和通道并尝试再次运行该函数的情况。
由于某种原因,我从 pika.adapters.utils.io_services_utils 记录器多次收到此错误。
_AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=17, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.240.0.165', 47600)>; Caller's …Run Code Online (Sandbox Code Playgroud)