DefaultJmsListenerContainerFactory配置进程并发

jav*_*Try 0 java spring jms spring-jms

我正在为 JmsFactory 配置并发性以便使用我的队列。

我这样做了:

  @Bean(name = "myFactory")
  public DefaultJmsListenerContainerFactory sqsFactory(SQSConnectionFactory connectionFactory,
      CustomJmsListenerConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("1-3");
    return factory;
  }
Run Code Online (Sandbox Code Playgroud)

我看到DefaultJmsListenerContainerFactory.setConcurrency后面调用了DefaultMessageListenerContainer。

我在我的应用程序中配置了 2 个队列,我正在使用 spring boot:

@JmsListener(destination = "queue1", containerFactory = "myFactory")
@JmsListener(destination = "queue2", containerFactory = "myFactory")
Run Code Online (Sandbox Code Playgroud)

我正在阅读 spring 文档并遇到一些方法,现在我有一些疑问。

1 - 有什么区别:

setConcurrency(String concurrency)
setConcurrentConsumers(int concurrentConsumers)
Run Code Online (Sandbox Code Playgroud)

即使阅读文档,我也不理解其中的区别以及此配置如何改变应用程序行为。我认为 setConcurrency 应该是每个 @jmsLister 用于从队列获取消息的线程数...您能解释一下我有 100 条消息排队(每个配置队列)的配置映像示例吗?

2 - setMaxMessagesPerTask(int maxMessagesPerTask)

如果队列中有 100 条消息,并发数 = 3,并且该数字为 10(默认),那么行为是什么?

Gar*_*ell 5

请阅读这两种情况的 Javadoc。

1.

setConcurrency(String concurrency)

这只是一个方便

/**
 * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
 * upper limit String, e.g. "10" (the lower limit will be 1 in this case).
 * <p>This listener container will always hold on to the minimum number of consumers
 * ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
 * of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
 */
@Override
public void setConcurrency(String concurrency) {
    try {
        int separatorIndex = concurrency.indexOf('-');
        if (separatorIndex != -1) {
            setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
            setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1)));
        }
        else {
            setConcurrentConsumers(1);
            setMaxConcurrentConsumers(Integer.parseInt(concurrency));
        }
    }
    catch (NumberFormatException ex) {
        throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
                "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
    }
}
Run Code Online (Sandbox Code Playgroud)

setConcurrency("1-3")是相同的

setConcurrentConsumers(1);
setMaxConcurrentConsumers(3);
Run Code Online (Sandbox Code Playgroud)
  1. 对消息处理没有影响;它只是意味着消费者任务每 10 条消息就会被回收(停止和启动)。
setConcurrentConsumers(1);
setMaxConcurrentConsumers(3);
Run Code Online (Sandbox Code Playgroud)