消费者没有从ActiveMQ接收消息

Man*_*har 7 activemq-classic spring-jms

我们面临着ActiveMQ及其消费者的随机问题.我们观察到,即使它们连接到ActiveMQ队列,也很少有消费者没有收到消息.但在消费者重新启动后它可以正常工作.

我们在ActiveMQ端有一个名为testQueue的队列.消费者正在尝试从此队列中取消队列消息.我们正在使用Spring的DefaultMessageListenerContainer来实现此目的.消息正从ActiveMQ Broker传递到使用者节点.从tcpdump来看,很明显,消息正在到达消费者节点,但实际的消费者代码无法看到消息.换句话说,消息似乎停留在ActiveMQ使用者代码或Spring的DefaultMessageListenerContainer中.

请参阅下图.为了更清楚地解决这个问题.消息正在到达消费者节点,但它没有到达"实际消费者类",这意味着消息卡在AMQ消费者代码或Spring DMLC中.

在此输入图像描述

以下是从ActiveMQ管理员处捕获的详细信息.

队列名称/待定消息计数/消费者计数/消息排队/消息排队testQueue/9/1/9/0

以下是更多细节.

Connection-ID/SessionId/Selector/Enqueues/Dequeues/Dispatched/Dispatched-Queue/Prefetch ID:bearsvir52-45176-1375519181268-3:5/1// 9/0/9/9/250

从第二个表中可以明显看出,消息正在传递给消费者,但消费者并未确认消息.因此,消息被卡在代理端的Dispatched-Queue中.

您的通知几点:

1)b/w Broker节点和消费者节点没有时间差.

2)在消费者端观察到tcpdump.我们可以看到MessageDispatch(Openwire)数据包被转移到消费者节点,但是找不到MessageAck(Openwire).

3)有时它在节点上工作,有时它在同一节点上创建问题.

Man*_*har 2

花了很多时间才找到解决方案。在 AMQ 故障转移的情况下,org.apache.activemq.ActiveMQConnection.java类似乎存在一些问题。在这种情况下,连接对象不会在消费者端启动。

以下是我在 ActiveMQConnection.java 文件中添加的修复并编译源以创建activemq-core-xxxjar

private final Object startMutex = new Object();
Run Code Online (Sandbox Code Playgroud)

在 createSession 方法中添加了检查

public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
    synchronized (startMutex) {
        if(!isStarted()) {
            start();
        }
    }
Run Code Online (Sandbox Code Playgroud)