Gri*_*iff 9 java concurrency spring multithreading activemq-classic
我有一个使用ActiveMQ版本5.10的Spring JMS应用程序.我正在执行简单的并发测试.我使用Spring Boot,当前版本和注释来配置JMSListener和消息生成器.
消息生成器只是尽可能快地在队列上抛出消息.消息侦听器将消息从队列中拉出,但在获取消息后休眠1秒钟 - 模拟消息侦听器在获取消息后需要执行的一些工作.
我将JMSListener设置为100-1000个并发线程.如果我开始在同一时间的消息生产者和消费者(无论是在自己的JVM上运行),消费者从来没有得到最低配置的螺纹上方,尽管最大范围设为1000.
如果我让生产者首先开始并在队列上放置几千条消息,然后启动一个或多个消费者实例,它将稳定地提升线程,从100开始,然后每秒20个左右的线程,直到达到状态队列中有大约20-30条消息在飞行中.它永远不会捕获生成器 - 即使消费者没有接近其maxConcurrency计数,也总会有一些消息在队列中.
为什么消息使用者没有突然进入一堆额外的线程来清空队列而不是让队列中有20-30条消息呢?消费者是否有办法继续快速添加线程以便赶上队列中的消息?
以下是代码的相关部分.
消息制作者
@Component
public class ClientServiceImpl implements ClientService {
private static final String QUEUE="message.test.queue";
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void submitMessage(ImportantMessage importantMessage) {
System.out.println("*** Sending " + importantMessage);
jmsTemplate.convertAndSend(QUEUE, importantMessage);
}
}
Run Code Online (Sandbox Code Playgroud)
消息消费者
@SpringBootApplication
@EnableJms
public class AmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(AmqConsumerApplication.class, args);
}
@Value("${JMSHost}")
private String JMS_BROKER_URL;
@Autowired
static Command command;
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL);
((ActiveMQConnectionFactory)factory).setTrustAllPackages(true);
((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false);
return factory;
}
}
Run Code Online (Sandbox Code Playgroud)
监听器配置为这样......
@Component
public class TransformationListener {
private static final String QUEUE="message.test.queue?consumer.prefetchSize=10";
@JmsListener(destination=QUEUE, concurrency = "100-1000")
public void handleRequest(ImportantMessage importantMessage) {
System.out.println("*** Recieved message: " + importantMessage + " on thread" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
您还面临这种行为吗?您是否在http://activemq.apache.org/what-is-the-prefetch-limit-for.html上阅读了这条建议“池化消费者和预取” 您是否尝试过使用 prefetchSize=0 或 1 ?我认为1可以解决你的问题。如果 prefetchSize > 1,您可能需要将 AbortSlowAckConsumerStrategy 减少到低于默认的 30 秒。在您的情况下,要让超过 100 个线程消费消息,您需要有超过 1000 条未消费且未在队列中预取的消息,因为 prefetchSize 为 10。
| 归档时间: |
|
| 查看次数: |
3845 次 |
| 最近记录: |