我正在尝试使用camel-amqp(2.17版)组件在我的camel路由中连接到rabbitmq。
我已将其配置如下:
@Bean
CachingConnectionFactory jmsCachingConnectionFactory(){
JmsConnectionFactory pool = new JmsConnectionFactory();
pool.setRemoteURI("amqp://127.0.0.1:5672");
pool.setUsername("guest");
pool.setPassword("guest");
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(pool);
return cachingConnectionFactory;
}
@Bean
JmsConfiguration jmsConfig(){
JmsConfiguration configuration = new JmsConfiguration();
configuration.setConnectionFactory(jmsCachingConnectionFactory());
// configuration.setCacheLevelName("CACHE_CONSUMER");
return configuration;
}
@Bean
AMQPComponent amqp(){
AMQPComponent component = new AMQPComponent();
component.setConfiguration(jmsConfig());
return component;
}
Run Code Online (Sandbox Code Playgroud)
我得到的错误是
javax.jms.JMSException: 远程主机在 org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:66) ~[qpid-jms-client-0.8.0.jar 强制关闭现有连接:0.8.0]
在我的 rabbitmq 日志中,我可以看到以下我无法理解的消息
*
** Reason for termination ==
** {function_clause,
[{rabbit_amqp1_0_link_util,'-outcomes/1-lc$^0/1-0-',
[{list,
[{symbol,<<"amqp:accepted:list">>},
{symbol,<<"amqp:rejected:list">>},
{symbol,<<"amqp:released:list">>},
{symbol,<<"amqp:modified:list">>}]}],
[{file,"src/rabbit_amqp1_0_link_util.erl"},{line,49}]},
{rabbit_amqp1_0_link_util,outcomes,1,
[{file,"src/rabbit_amqp1_0_link_util.erl"},{line,49}]},
{rabbit_amqp1_0_outgoing_link,attach,3,
[{file,"src/rabbit_amqp1_0_outgoing_link.erl"},{line,41}]},
{rabbit_amqp1_0_session_process,with_disposable_channel,2,
[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,377}]},
{rabbit_amqp1_0_session_process,handle_control,2,
[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,197}]}, …Run Code Online (Sandbox Code Playgroud) 我正在解析一个CSV文件,将其拆分并通过camel中的多个处理器进行路由.有两个端点,一个具有错误数据,另一个端点具有验证数据.
我需要汇总数据的建议.
假设CSV文件有10条记录,其中6条记录到达一个端点,而4条记录到达另一个端点.如何知道是否所有10个文件都已从每个端点的文件中完成并超过聚合器.我需要创建两个文件,一个包含有效数据,另一个包含来自单个文件的损坏数据.