提高消息消费率

hem*_*vsn 4 java activemq-classic jms spring-jms

我们使用 ActiveMQ 作为消息代理。

对于其中一个队列,我们​​发现消息的生成速率远远高于消息的消费速率。这有时会导致 ActiveMQ 崩溃。

因此,我们正在探索提高消费率的选择。(首要任务是增加现有消费者的比率,然后增加消费者 Pod/实例的数量)。

以下是当前监听器配置

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrency("1-1");

    factory.setMessageConverter(jacksonJmsMessageConverter());
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

侦听器逻辑与数据库多次交互,并且可以被限定为 I/O 绑定任务。所以我们正在考虑并行处理多个消息。

我们找到了以下选项。

  1. 我们将并发设置为1-1暗示 concurrentConsumersmaxConcurrentConsumers为 1,并且一次仅处理 1 条消息。这是我们可以增加的配置,5-10以便最少 5 个和最多 10 个消费者能够同时操作。

  2. 我们还发现我们也可以TaskExecutor在监听器工厂中进行设置。就像设置athreadPoolExecutor (corePoolSize = 5, maxPoolSize = 10, queueCapacity=50)也会帮助我们并发处理消息。

  3. 我不确定使用哪个选项(1 或 2)

  4. 如果我们将并发数增加到 5-10,并设置一个 ThreadPoolExecutor,会有什么行为和影响?
  5. 关于选择参数的最佳值有什么指示吗?

小智 5

我同意前面的答案。

当 Spring DefaultMessageListenerContainer (DMLC) 的消息消费性能较差时,这几乎总是意味着要为每个消息读取重新创建 JMS 使用者/会话/连接。

在非事务性情况下,您可以让 DMLC 缓存消费者。DMLC 将使用单个 JMS 连接,并在该连接上启动多个 JMS 会话。每个会话都会有一个 JMS 消息使用者。

在事务处理的情况下,DMLC 中不会有缓存,并且必须使用缓存连接工厂以避免性能问题。查看org.apache.activemq.pool.PooledConnectionFactory或 来org.messaginghub.pooled.jms.JmsPoolConnectionFactory了解此功能。

至于你的问题,我也懒得理会TaskExecutor。我建议从并发5-5开始。我发现让最小和最大消费者数量相同(只是有一个固定的池)在稳定性和性能方面具有优势。

至于ActiveMQ崩溃,这是由于ActiveMQ预取造成的。默认情况下,ActiveMQ 的预取值为 1000。JMS 消息使用者请求一条消息,代理在预取中传递该消息和 999 个其他消息。如果消费者/会话随后关闭,则 999 消息将在客户端被丢弃,然后在代理上重新排队。这对经纪人来说是非常侮辱性的,并且没有得到很好的处理。

另外,请注意,例如,如果您有 5 个并发消费者,那么第一个消费者将收到 1000 条消息,然后下一个消费者将收到 1000 条消息,依此类推。因此,如果队列中只有 500 条消息,那么只有一个消费者处于活动状态。队列中需要有 5000 条消息才能激活所有 5 个使用者。

如有疑问,请在客户端配置中禁用消息预取:

tcp://broker_uri:61616?jms.prefetchPolicy.all=0