ser*_*ces 6 spring activemq-classic
我已将Spring DefaultMessageListenerContainer配置为消耗来自队列的消息的ActiveMQ使用者.我们称之为"Test.Queue"我将这些代码部署在4台不同的机器上,所有机器都配置到同一个ActiveMQ实例来处理来自同一"Test.Queue"队列的消息.
所有4台机器都启动并运行后,我将最大消费者大小设置为20,我看到消费者数量与队列数相比为80(4*最大消费者大小= 80)
当生成并发送到队列的消息变得很高时,一切都很好.
当有1000个消息时,在80个消费者中,让我们说其中一个被卡住它会冻结Active MQ以停止向其他消费者发送消息.
所有消息都永远停留在ActiveMQ中.
由于我有4台机器,最多有80名消费者,我不知道哪个消费者没有承认.
我停下来重新启动所有4台机器,当我停止那些卡住了坏消费者的机器时,消息再次开始流动.
我不知道如何配置DefaultMessageListenerContainer以放弃坏消费者并立即发信号通知ActiveMQ以开始发送消息.
即使没有Spring,我也可以创建场景,如下所示:
我创建了2个消费者(消费者A,B)并在一个消费者B的onMessage()方法中,我让线程长时间休眠(Thread.sleep(Long.MAX_VALUE)),其条件就像当前时间%13是0然后让线程进入休眠状态.
跑这两个消费者.
假设A,B,C由Spring的DefaultMessageListenerContainer管理,我如何调整Spring DefaultMessageListenerContainer以便在不能确认X秒数后将该坏消费者从池中(在我的情况下是消费者B)中取出,立即确认代理经纪人没有永久保留消息.
谢谢你的时间.
感谢我能解决这个问题.
这里有一些可以尝试的选项...
将队列预取设置为 0 以促进在消费者之间更好的分配并减少特定消费者上的“卡住”消息。请参阅http://activemq.apache.org/what-is-the-prefetch-limit-for.html
在连接上设置“?useKeepAlive=false&wireFormat.maxInactivityDuration=20000”,以在指定的非活动时间后使慢速消费者超时
设置队列策略“slowConsumerStrategy->abortSlowConsumer”...再次使慢速消费者超时
<policyEntry ...
...
<slowConsumerStrategy>
<abortSlowConsumerStrategy />
</slowConsumerStrategy>
...
</policyEntry>
Run Code Online (Sandbox Code Playgroud)| 归档时间: |
|
| 查看次数: |
3675 次 |
| 最近记录: |