RabbitMQ - 为什么消费者从队列中消失

J S*_*ngh 5 java spring rabbitmq

我们的 grail 应用程序使用以下 RabbitMQ 客户端并使用 XML 配置

spring-integration-amqp:3.0.8.RELEASE
spring-rabbit:1.4.5.RELEASE
Run Code Online (Sandbox Code Playgroud)

以下是出现问题的队列之一的 XML 配置:

    <integration:poller id="default" default="true" fixed-rate="2000"/>

    <rabbit:connection-factory id="rabbitConnectionFactory"
                               host="${rabbitmq.host}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               requested-heartbeat="30"/>

    <rabbit:admin connection-factory="rabbitConnectionFactory"/>

    <rabbit:template id="amqpTemplate"
                     connection-factory="rabbitConnectionFactory"/>
    <rabbit:queue id="batchUnspscQueue" name="${unspsc.batch.queue}"/>

    <integration:channel id="fromUnspscBatch">
        <integration:queue capacity="10" />
    </integration:channel>

    <amqp:inbound-channel-adapter channel="fromUnspscBatch" concurrent-consumers="1" queue-names="${unspsc.batch.queue}" acknowledge-mode="NONE" channel-transacted="false" connection-factory="rabbitConnectionFactory" />
    <integration:service-activator
            id="erpUnspscBatchSubscriber"
            ref="erpMessageHandlerService"
            method="performUnspscBatchUpdate"
            input-channel="fromUnspscBatch"
            output-channel="nullChannel"/>

Run Code Online (Sandbox Code Playgroud)

在生产中,我们注意到,当“unspsc.batch.queue”负载过重时(即输入消息超过通道内部队列容量,即 10),消费者突然开始从队列中消失(使用 RabbitMQ 管理 UI 注意到)

看起来连接正在重置,但奇怪的是附加到同一连接的所有其他队列的消费者收到关闭消息,然后重新启动,但附加到这个繁忙的特定队列的消费者没有重新启动;消息停止对此队列进行处理,并开始在代理队列中排队。

目前,每当我们遇到这种情况时,我们都需要重新启动应用程序。有时我们需要多次重启应用程序,因为消费者处理了一些消息然后又消失了。

注意到其他队列消费者的以下日志消息重新启动正常:

02.02-07:10:21.202 [SimpleAsyncTaskExecutor-3] INFO  org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer -  Restarting Consumer: tags=[{amq.ctag-OaL0P3G4j0VQKGQ16lGlBg=background-worker.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://wluser@10.6.10.4:5672/,44), acknowledgeMode=NONE local queue size=0
...
02.02-07:10:21.200 [SimpleAsyncTaskExecutor-2] WARN  org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer -  Consumer raised exception, processing can restart if the connection factory supports itcom.rabbitmq.client.ShutdownSignalException: connection error

Run Code Online (Sandbox Code Playgroud)

我们能够在开发环境中复制这一点,当我们将容量更改为一个巨大的数字时,它不会给我们带来这个问题。因为消费者能够带来所有排队的消息,然后按照自己的节奏进行处理。但这会对应用程序 cpu /ram 产生副作用,并且如果服务器在中间重新启动,则所有消息都将丢失,因为 ACK 模式设置为 NONE。

    <integration:channel id="fromUnspscBatch">
        <integration:queue capacity="100000" />
    </integration:channel>
Run Code Online (Sandbox Code Playgroud)

对此的任何帮助或见解将不胜感激。

1:我们是否缺少任何配置来处理这样的场景,即当容量保持较少(例如 10)并且代理队列中有大量消息(例如 10000)时?

https://docs.spring.io/spring-integration/docs/1.0.x/reference/html/ch03s02.html上注意到以下行。当通道内部队列达到其容量时是否有任何副作用?

If the queue has reached capacity, then the sender will block until room is available
Run Code Online (Sandbox Code Playgroud)

2:还知道为什么附加到同一连接的所有其他队列的消费者收到关闭消息然后重新启动,但附加到这个繁忙的特定队列的消费者没有重新启动?