基本上我的消费者也是生产者.我们得到一个初始数据集,然后将其发送到队列.消费者接受物品并对其进行处理,从那时起有3种可能性:
我的问题在于第3步,因为队列起初非常快,可能会将一段数据分解为队列中重复的部分,并且消费者继续处理它并最终进入无限循环.
我认为防止这种情况的方法是防止重复进入队列.我不能在客户端做到这一点,因为在一个小时的过程中,我可能会有许多核心处理数十亿个数据点(让每个客户端在提交之前对其进行扫描会使我的速度减慢太多).我认为这需要在服务器端完成,但是,正如我所提到的,数据非常大,我不知道如何有效地确保没有重复.
我可能会问不可能,但我想我会试一试.任何想法将不胜感激.
Rom*_*man 11
我想即使您可以解决不向队列发送重复项的问题,您迟早会遇到此问题:
来自RabbitMQ文档:"从故障中恢复:如果客户端由于客户端连接的节点故障而与代理断开连接,如果客户端是发布客户端,则代理可能已接受并且传递来自客户端的消息而客户端没有收到确认消息;同样在消费方面,客户端可能已经发出消息确认,并且不知道这些确认是否已经发送给代理并且之前已经处理过发生了故障.简而言之,您仍然需要确保您的消费客户能够识别和处理重复的消息."
基本上,它看起来像这样,你向rabbitmq发送请求,rabbitmq用ACK回复但是由于1个或那个原因,你的消费者或生产者没有收到这个ACK.Rabbitmq无法知道未收到确认,并且您的制作人最终会重新发送消息,从未收到确认.
处理重复消息很痛苦,尤其是在将消息传递用作一种RPC的应用程序中,但在使用这种消息传递体系结构时,这似乎是不可避免的.
核心问题似乎是这样的:
"...its possible that a piece of data is broken down into a part that's
duplicated in the queue and the consumers continue to process it and
end up in a infinite loop."
Run Code Online (Sandbox Code Playgroud)
您可以随心所欲地专注于排队项目的独特性,但上面的问题是您应该集中精力的地方,IMO。防止无限循环的一种方法可能是在消息有效负载中包含一个“已访问”位,该位由消费者在重新对损坏的项目进行排队之前设置。
另一种选择是让消费者重新排队回到一个特殊的队列,该队列的处理方式略有不同,以防止无限循环。无论哪种方式,您都应该通过将其作为应用程序策略的核心部分来处理该问题,而不是使用消息传递系统的功能来绕过它。