Mic*_*ung 6 java activemq-classic
我在使用 ActiveMQ 时遇到了问题,我发现它很难确定,甚至很难重现,也很难提出具体问题。请多多包涵。
基本上我有:
producer (prioritized messages) -> queue -> consumer
Run Code Online (Sandbox Code Playgroud)
通常,队列中有几条 10 万条消息,每当具有更高优先级的消息到达时,它们首先被消耗。
这工作正常,直到星星对齐并且写入队列的高优先级消息不会被消耗。至少在我打电话Queue.removeMatchingMessages(String selector)从队列中删除消息之前- 哪个和多少都无关紧要。
幸运的是,我发现了一个强有力的指标来说明正在发生的事情。
从我们的 UI 中可以看出,我提交了 444 条优先级 (3) 比其余 (1) 更高的消息,但它们没有被消耗:
使用调试器检查队列,我发现它StoreQueueCursor.pendingCount是 444:
如果我再提交 72 条消息,则待处理计数为 516 (444 + 72):
当我然后使用 删除 72 条消息时Queue.removeMatchingMessages(String selector),StoreQueueCursor.pendingCount变为 0:
我的 444 条消息突然被消耗了:
所以我现在能问的最好的问题是:
它的目的是什么StoreQueueCursor以及它如何导致我的消息被消费?或者更确切地说:为什么这些消息没有写入队列并准备好被消费?
任何帮助深表感谢。
我正在使用org.apache.activemq:activemq-broker:5.15.12(通过 Spring Boot 2.3.1.RELEASE)。
有趣的是,在我所有的高优先级消息都得到处理的“快乐情况”中,pendingCount它远高于 0:
在 ActiveMQ 的如何支持优先队列?它说:
由于消息游标(和客户端)实现了严格的优先级排序,如果消息分派可以从缓存中发生并且不必访问磁盘(即,您的消费者足够快以跟上生产者),则可以遵守严格的优先级排序),或者如果您使用的是永远不必刷新到磁盘的非持久性消息(使用 FilePendingMessageCursor)。但是,一旦遇到消费者速度慢或生产者速度明显加快的情况,您会观察到缓存将填满(可能是优先级较低的消息),而优先级较高的消息会卡在磁盘上,直到它们才可用。重新分页。在这种情况下,您可以决定权衡优化的消息调度以实现优先级实施。您可以禁用缓存,消息过期检查,
所以我尝试像这样禁用缓存(顺便说一下,我已经jms.prefetchPolicy.all=0设置了):
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setQueue(JmsQueueNames.TASK_QUEUE_PREFIX + ".*");
policyEntry.setDeadLetterStrategy(taskDeadLetterStrategy);
policyEntry.setPrioritizedMessages(true);
policyEntry.setUseCache(false);
policyEntry.setExpireMessagesPeriod(0);
Run Code Online (Sandbox Code Playgroud)
现在,useCache是假的,但是cacheEnabled是真的:
但是可以观察到相同的行为。
另外,我总是关闭 Broker-Persistence,所以我不确定上述情况是否适用:
@Bean
public BrokerService broker(ActiveMQProperties properties, DispatcherProperties dispatcherProperties) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.getProducerSystemUsage().getMemoryUsage().setLimit(dispatcherProperties.getActiveMq().getMemoryLimit());
brokerService.addConnector(properties.getBrokerUrl());
brokerService.setPlugins(getPluginsToLoad());
brokerService.setDestinationPolicy(policyMap());
return brokerService;
}
Run Code Online (Sandbox Code Playgroud)
来自 JMX 的信息: