如何在驼峰中实现事件驱动消费者

sur*_*ura 2 java apache-camel

我是Camel的新手,并且一直在努力了解如何在特定场景中使用骆驼.在这种情况下,有一个(基于Java)代理不时生成操作.我需要一个事件驱动的消费者来获得这些事件的通知.这些事件将被路由到"文件"生成器(暂时).

在骆驼书中,这个例子是针对民意调查的消费者.我找不到一个针对事件驱动的消费者的通用解决方案.我遇到了类似的JMX实现:

public class JMXConsumer extends DefaultConsumer implements NotificationListener { 

JMXEndpoint jmxEndpoint;    
public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 
    super(endpoint, processor);
    this.jmxEndpoint = endpoint;
}

public void handleNotification(Notification notification, Object handback) { 
    try {
        getProcessor().process(jmxEndpoint.createExchange(notification)); 
    } catch (Throwable e) {
        handleException(e); 
    }
}
Run Code Online (Sandbox Code Playgroud)

}

这里,只要JMX通知到达,就会调用handleNotification.

我相信我必须做类似的事情,以便在代理生成动作时通知我的消费者.但是,上面的handleNotification方法特定于JMX.该网页说:"在实现自己的事件驱动的消费者时,您必须确定一个类似的事件监听器方法,以在您的自定义消费者中实现."

我想知道:我如何识别类似的事件监听器,以便在我的代理有动作时通知我的消费者.

任何有关网页的建议/链接都非常感谢.

Mat*_*att 5

我知道这是一个古老的问题,但我一直在努力解决这个问题,并且只是想我会为其他寻找答案的人记录我的发现.

创建Endpoint类(扩展DefaultEndpoint)时,您将覆盖以下创建使用者的方法:

public Consumer createConsumer(Processor processor)
Run Code Online (Sandbox Code Playgroud)

然后,在您的消费者中,您可以访问处理器 - 在此处理器上调用"进程"将创建一个事件并触发该路由.

例如,假设您有一些侦听消息的Java API,并且具有某种侦听器.在我的例子中,Listener将传入的消息放入LinkedBlockingQueue,我的Consumer'doStart'方法看起来像这样(添加你自己的错误处理):

@Override
protected void doStart() throws Exception {
    super.doStart();

    // Spawn a new thread that submits exchanges to the Processor
    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            while(true) {
                IMessage incomingMessage = myLinkedBlockingQueue.take();
                Exchange exchange = getEndpoint().createExchange();
                exchange.getIn().setBody(incomingMessage);
                myProcessor.process(exchange);  
            }
        }
    };
    new Thread(runnable).start();
}
Run Code Online (Sandbox Code Playgroud)

现在我可以在我的CamelContext中创建创建此Consumer的Endpoint的Component,并像这样使用它:

from("mycomponent:incoming").to("log:messages");
Run Code Online (Sandbox Code Playgroud)

每次从Java API到达新消息时都会触发日志消息.

希望有人帮助!