Spring DefaultMessageListenerContainer和ActiveMQ

ser*_*ces 6 spring activemq-classic

我已将Spring DefaultMessageListenerContainer配置为消耗来自队列的消息的ActiveMQ使用者.我们称之为"Test.Queue"我将这些代码部署在4台不同的机器上,所有机器都配置到同一个ActiveMQ实例来处理来自同一"Test.Queue"队列的消息.

所有4台机器都启动并运行后,我将最大消费者大小设置为20,我看到消费者数量与队列数相比为80(4*最大消费者大小= 80)

当生成并发送到队列的消息变得很高时,一切都很好.

当有1000个消息时,在80个消费者中,让我们说其中一个被卡住它会冻结Active MQ以停止向其他消费者发送消息.

所有消息都永远停留在ActiveMQ中.

由于我有4台机器,最多有80名消费者,我不知道哪个消费者没有承认.

我停下来重新启动所有4台机器,当我停止那些卡住了坏消费者的机器时,消息再次开始流动.

我不知道如何配置DefaultMessageListenerContainer以放弃坏消费者并立即发信号通知ActiveMQ以开始发送消息.

即使没有Spring,我也可以创建场景,如下所示:

  1. 我最多生成了5000条消息并将它们发送到"Test.Queue"队列
  2. 我创建了2个消费者(消费者A,B)并在一个消费者B的onMessage()方法中,我让线程长时间休眠(Thread.sleep(Long.MAX_VALUE)),其条件就像当前时间%13是0然后让线程进入休眠状态.

  3. 跑这两个消费者.

  4. 去了Active MQ,发现队列有2个消费者.
  5. A和B都在处理消息
  6. 在某个时间点,消费者B的onMessage()被调用,当当前时间%13的条件为0时,它使线程处于休眠状态.
  7. 消费者B被卡住了,它无法向经纪人承认
  8. 我回到Active MQ Web控制台,仍然将消费者视为2,但没有消息出列.
  9. 现在我创建了另一个消费者C并运行它来消费.
  10. 只有ActiveMQ中的消费者数量从2增加到3.
  11. 但是消费者C并没有消费任何东西,因为经纪人没有发送任何持有它们的消息,因为它仍在等待消费者B承认它.
  12. 我还注意到消费者A没有消费任何东西
  13. 我去杀死消费者B,现在所有的消息都被消耗掉了.

假设A,B,C由Spring的DefaultMessageListenerContainer管理,我如何调整Spring DefaultMessageListenerContainer以便在不能确认X秒数后将该坏消费者从池中(在我的情况下是消费者B)中取出,立即确认代理经纪人没有永久保留消息.

谢谢你的时间.

感谢我能解决这个问题.

Ben*_*Day 3

这里有一些可以尝试的选项...

  1. 将队列预取设置为 0 以促进在消费者之间更好的分配并减少特定消费者上的“卡住”消息。请参阅http://activemq.apache.org/what-is-the-prefetch-limit-for.html

  2. 在连接上设置“?useKeepAlive=false&wireFormat.maxInactivityDuration=20000”,以在指定的非活动时间后使慢速消费者超时

  3. 设置队列策略“slowConsumerStrategy->abortSlowConsumer”...再次使慢速消费者超时

    <policyEntry ...
      ...
      <slowConsumerStrategy>
          <abortSlowConsumerStrategy />
      </slowConsumerStrategy>
      ...
    </policyEntry> 
    
    Run Code Online (Sandbox Code Playgroud)