我是Camel的新手,并且一直在努力了解如何在特定场景中使用骆驼.在这种情况下,有一个(基于Java)代理不时生成操作.我需要一个事件驱动的消费者来获得这些事件的通知.这些事件将被路由到"文件"生成器(暂时).
在骆驼书中,这个例子是针对民意调查的消费者.我找不到一个针对事件驱动的消费者的通用解决方案.我遇到了类似的JMX实现:
public class JMXConsumer extends DefaultConsumer implements NotificationListener {
JMXEndpoint jmxEndpoint;
public JMXConsumer(JMXEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.jmxEndpoint = endpoint;
}
public void handleNotification(Notification notification, Object handback) {
try {
getProcessor().process(jmxEndpoint.createExchange(notification));
} catch (Throwable e) {
handleException(e);
}
}
Run Code Online (Sandbox Code Playgroud)
}
这里,只要JMX通知到达,就会调用handleNotification.
我相信我必须做类似的事情,以便在代理生成动作时通知我的消费者.但是,上面的handleNotification方法特定于JMX.该网页说:"在实现自己的事件驱动的消费者时,您必须确定一个类似的事件监听器方法,以在您的自定义消费者中实现."
我想知道:我如何识别类似的事件监听器,以便在我的代理有动作时通知我的消费者.
任何有关网页的建议/链接都非常感谢.
我知道这是一个古老的问题,但我一直在努力解决这个问题,并且只是想我会为其他寻找答案的人记录我的发现.
创建Endpoint类(扩展DefaultEndpoint)时,您将覆盖以下创建使用者的方法:
public Consumer createConsumer(Processor processor)
Run Code Online (Sandbox Code Playgroud)
然后,在您的消费者中,您可以访问处理器 - 在此处理器上调用"进程"将创建一个事件并触发该路由.
例如,假设您有一些侦听消息的Java API,并且具有某种侦听器.在我的例子中,Listener将传入的消息放入LinkedBlockingQueue,我的Consumer'doStart'方法看起来像这样(添加你自己的错误处理):
@Override
protected void doStart() throws Exception {
super.doStart();
// Spawn a new thread that submits exchanges to the Processor
Runnable runnable = new Runnable() {
@Override
public void run() {
while(true) {
IMessage incomingMessage = myLinkedBlockingQueue.take();
Exchange exchange = getEndpoint().createExchange();
exchange.getIn().setBody(incomingMessage);
myProcessor.process(exchange);
}
}
};
new Thread(runnable).start();
}
Run Code Online (Sandbox Code Playgroud)
现在我可以在我的CamelContext中创建创建此Consumer的Endpoint的Component,并像这样使用它:
from("mycomponent:incoming").to("log:messages");
Run Code Online (Sandbox Code Playgroud)
每次从Java API到达新消息时都会触发日志消息.
希望有人帮助!
| 归档时间: |
|
| 查看次数: |
5160 次 |
| 最近记录: |