与ActiveMQ并行处理多条消息

djB*_*jBo 3 parallel-processing activemq-classic apache-camel

我想使用简单的Processor/AsyncProcessor作为目标并行处理队列中的消息.处理器每条消息需要一点时间,但每条消息可以单独处理,因此可以同时处理(在健康的边界内).

我很难找到例子,尤其是关于驼峰路线的xml配置.

到目前为止,我已经定义了一个线程池,路由和处理器:

<threadPool id="smallPool" threadName="MyProcessorThread" poolSize="5" maxPoolSize="50" maxQueueSize="100"/>
<route>
    <from uri="broker:queue:inbox" />
    <threads executorServiceRef="smallPool">
        <to uri="MyProcessor" />
    </threads>
</route>
<bean id="MyProcessor" class="com.example.java.MyProcessor" />
Run Code Online (Sandbox Code Playgroud)

我的处理器看起来像:

public class MyProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        Message in = exchange.getIn();
        String msg = in.getBody(String.class);      
        System.out.println(msg);
        try {
            Thread.sleep(10 * 1000); // Do something in the background
        } catch (InterruptedException e) {}
        System.out.println("Done!");
    }
}
Run Code Online (Sandbox Code Playgroud)

不幸的是,当我将消息发布到队列时,它们仍然被逐个处理,每个消息延迟10秒(我的"后台任务").

任何人都可以指出我使用定义的线程池处理消息的正确方向或解释我做错了什么?

Cla*_*sen 6

您应该使用concurrentConsumers选项,如评论中所述,

<route>
    <from uri="broker:queue:inbox?concurrentConsumers=5" />
    <to uri="MyProcessor" />
</route>
Run Code Online (Sandbox Code Playgroud)

请注意,maxConcurrentConsumers您还可以设置使用最小/最大并发消费者范围,因此Camel将根据负载自动增长/缩小.

请参阅JMS文档中的更多详细信息

  • 您可以从hawtio Web控制台使用JMX /进行更改 (2认同)