pbi*_*len 8 java architecture concurrency apache-camel enterprise-integration
我有以下情况:
我启动Camel路线如下:
public class MyMessage implements Runnable {
public void run() {
// omitted here
}
}
from("netty:tcp://localhost:7777?textline=true&sync=false")
... // omitted here: parse message to pojo MyMessage, set header "group-identifier"
.to(seda:process);
Run Code Online (Sandbox Code Playgroud)
此Camel路由使用TCP流,解析每个传入消息的有效负载并将其转换为MyMessagepojo,并group-identifier在交换机上设置与消息对应的标头...
现在我想消费seda:process如下:
run().我想为此提供/定义一个ExecutorService,所以我可以控制线程数.我可以在这里应用哪些企业集成模式?我如何将这些概念映射到Camel?
我了解到ActiveMQ具有消息组的概念(http://activemq.apache.org/message-groups.html).这可能提供一种方法来确保同一组中的两条消息永远不会同时执行.虽然,我不确定仅为此引入ActiveMQ并不是一种矫枉过正.这可以通过'核心'Camel/Java来实现吗?
在ActiveMQ中很容易做到这一点.以下代码段根据需要模拟执行消息:
这依赖于http://activemq.apache.org/message-groups.html上解释的ActiveMQ消息组.
final CamelContext context = new DefaultCamelContext();
context.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false"));
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("activemq:queue:q?concurrentConsumers=5")
.process(exchange -> {
System.out.println(Thread.currentThread() + " - " + exchange.getIn().getBody());
Thread.sleep(5000);
});
}
});
context.start();
for (int i = 0; i < 1000; ++i) {
context.createFluentProducerTemplate()
.withBody("This is a message from group : " + (i % 5))
.withHeader("JMSXGroupID", "" + (i % 5))
.to("activemq:queue:q")
.send();
}
Run Code Online (Sandbox Code Playgroud)
也就是说,我(仍然)想知道这是否可以用纯EIP/Camel-core完成.
| 归档时间: |
|
| 查看次数: |
200 次 |
| 最近记录: |