max*_*der 7 java spring scalability jms spring-jms
我有一个DefaultMessageListenerContainer,(在我看来)没有扩展.Container被定义为侦听队列,其中有100条消息.
我希望,Container可以达到任何长度,消息将尽可能快地消耗(通过观察maxConcurrentConsumers配置).所以我认为,有7个并发消费者.(在容器启动时由2个concurrentConsumers开始)一些日志记录信息:
activeConsumerCount: 5
concurrentConsumers: 2
scheduledConsumerCount: 5
idleConsumerLimit: 1
idleTaskExecLimit: 1
maxConcurrentConsumers: 7
Run Code Online (Sandbox Code Playgroud)
我的Spring-config(它的一部分):
<bean id="abstractMessageListenerContainer" class="my.package.structure.LoggingListenerContainer" abstract="true">
<property name="connectionFactory" ref="jmscfCee" />
<property name="maxConcurrentConsumers" value="7"/>
<property name="receiveTimeout" value="100000" />
<property name="concurrentConsumers" value="2" />
</bean>
<bean class="my.package.structure.LoggingListenerContainer" parent="abstractMessageListenerContainer">
<property name="destinationName" value="MY.QUEUE" />
<property name="messageListener" ref="myMessageListener" />
</bean>
<bean id="myMessageListener" class="my.package.structure.ListenerClass"></bean>
Run Code Online (Sandbox Code Playgroud)
我的记录容器
public class LoggingListenerContainer extends DefaultMessageListenerContainer{
private static final Logger logger = Logger
.getLogger(LoggingListenerContainer.class);
@Override
protected void doInvokeListener(MessageListener listener, Message message)
throws JMSException {
logger.info("activeConsumerCount: " + this.getActiveConsumerCount());
logger.info("concurrentConsumers: " + this.getConcurrentConsumers());
logger.info("scheduledConsumerCount: " + this.getScheduledConsumerCount());
logger.info("idleConsumerLimit: " + this.getIdleConsumerLimit());
logger.info("idleTaskExecLimit: " + this.getIdleTaskExecutionLimit());
logger.info("maxConcurrentConsumers: " + this.getMaxConcurrentConsumers());
super.doInvokeListener(listener, message);
}
Run Code Online (Sandbox Code Playgroud)
我的听众课程:
public class ListenerClass implements MessageListener {
public void onMessage(Message msg) {
//Do some business function
}
}
Run Code Online (Sandbox Code Playgroud)
有人可以如此友好地纠正我的配置或给我一些关于我的配置的tipps或解释我的容器的方法?(如果我误解了某些东西)
我在本地使用ActiveMQ进行测试(使用WebSphere MQ进行生产) - 如果它与可伸缩性主题相关.
编辑:
<bean id="jmscfCee" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>${jmscfCee.hostName}</value>
</property>
</bean>
<bean id="jmscfCeeCachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory ">
<constructor-arg ref="jmscfCee" />
<property name="sessionCacheSize" value="10" />
</bean>
Run Code Online (Sandbox Code Playgroud)
这取决于。几年前,我在 ActiveMQ 上遇到了类似的问题,它的默认行为针对大量(数千)小消息进行了大量优化。默认情况下,每个消费者将分批预取 1000 条消息,因此如果您的消息数量很少,您可能会发现它们都已在一个消费者的预取缓冲区中结束,而让其他消费者处于空闲状态。
您可以在连接 URI 或 Spring 配置中使用预取策略调整此行为,如果这是您构建连接工厂的方式。
<amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost">
<property name="prefetchPolicy">
<amq:prefetchPolicy all="1" />
</property>
</amq:connectionFactory>
Run Code Online (Sandbox Code Playgroud)
我当时使用的 ActiveMQ 版本不支持预取限制为 0(即不预取,每次都去代理),但文档表明现在允许这样做。
| 归档时间: |
|
| 查看次数: |
5516 次 |
| 最近记录: |