ale*_*tsa 10 error-handling transactions multicast apache-camel
我得到了带有骆驼(3.5.0)应用程序的弹簧靴(2.3.2.RELEASE)和两条测试路线:
这个想法是检查从多播调用的子路由中发生异常时会发生什么以及它如何处理事务。
@Component
public class MyRoute1 extends RouteBuilder {
@Override
public void configure() {
onException(Exception.class)
.useOriginalBody()
.log("Error handler parent. Body is: ${body}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
System.out.println(cause.getMessage());
cause.printStackTrace(System.out);
}
})
.handled(true);
from("jms:queue:EventsQueue")
.routeId("route1")
.onCompletion().log("On complete parent").end()
.transacted()
.multicast(AggregationStrategies.useOriginal(), false)
.to("direct:route2")
.log("Second step")
.end()
.log("Third step");
}
}
Run Code Online (Sandbox Code Playgroud)
和
@Component
public class MyRoute2 extends RouteBuilder {
@Override
public void configure() {
onException(IOException.class)
.log("Error handling child");
from("direct:route2")
.routeId("route2")
.onCompletion().log("On complete child").end()
.log("First step")
.throwException(new IOException("Very bad exception"));
}
}
Run Code Online (Sandbox Code Playgroud)
当我将消息发送到jms:queue:EventsQueue我希望交换将失败并抛出我的异常并遵循自定义日志消息的顺序:
First step
Error handling child
On complete child
On complete parent
Run Code Online (Sandbox Code Playgroud)
不应触发其他日志消息。
但我有以下行为:java.lang.NullPointerException被多播聚合策略抛出并被父onException块捕获。所以自定义日志的顺序是:
First step
Error handling child
On complete child
Error handler parent
null
java.lang.NullPointerException
at org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy.aggregate(UseOriginalAggregationStrategy.java:62)
...
On complete parent
Run Code Online (Sandbox Code Playgroud)
有趣的部分来了:如果我删除.transacted()父路由中的方法,行为会发生变化。该java.lang.NullPointerException异常仍聚集战略投掷,但它并不在我的父母陷入onException块。
骆驼原木.transacted():
2020-11-03 14:38:35.800 INFO 11744 --- [ main] a.test.errors.MySpringBootApplication : Started MySpringBootApplication in 3.44 seconds (JVM running for 3.916)
2020-11-03 14:38:46.389 INFO 11744 --- [[EventsPsQueue]] route2 : First step
2020-11-03 14:38:46.394 INFO 11744 --- [[EventsPsQueue]] route2 : Error handling child
2020-11-03 14:38:46.398 ERROR 11744 --- [[EventsPsQueue]] o.a.c.p.e.DefaultErrorHandler : Failed delivery for (MessageId: queue_EventsPsQueue_ID_wsc-111-71a-44245-1603872340334-4_9_1_1_3 on ExchangeId: ID-wsc-111-71a-1604403526383-0-2). Exhausted after delivery attempt: 1 caught: java.io.IOException: Very bad exception. Processed by failure processor: FatalFallbackErrorHandler[Channel[log5]]
Message History (complete message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [from[jms://queue:EventsPsQueue] ] [ 25]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.io.IOException: Very bad exception
at activemq.test.errors.chapter1.MyRoute2.configure(MyRoute2.java:20) ~[classes/:na]
at org.apache.camel.builder.RouteBuilder.checkInitialized(RouteBuilder.java:483) ~[camel-core-engine-3.5.0.jar:3.5.0]
at org.apache.camel.builder.RouteBuilder.configureRoutes(RouteBuilder.java:430) ~[camel-core-engine-3.5.0.jar:3.5.0]
at org.apache.camel.builder.RouteBuilder.addRoutesToCamelContext(RouteBuilder.java:405) ~[camel-core-engine-3.5.0.jar:3.5.0]
at org.apache.camel.impl.engine.AbstractCamelContext.addRoutes(AbstractCamelContext.java:1185) ~[camel-base-3.5.0.jar:3.5.0]
at org.apache.camel.main.RoutesConfigurer.configureRoutes(RoutesConfigurer.java:93) ~[camel-main-3.5.0.jar:3.5.0]
at org.apache.camel.spring.boot.CamelSpringBootApplicationListener.onApplicationEvent(CamelSpringBootApplicationListener.java:101) ~[camel-spring-boot-3.5.0.jar:3.5.0]
at org.apache.camel.spring.boot.CamelSpringBootApplicationListener.onApplicationEvent(CamelSpringBootApplicationListener.java:57) ~[camel-spring-boot-3.5.0.jar:3.5.0]
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:404) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:361) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:898) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.2.RELEASE.jar:2.3.2.RELEASE]
at activemq.test.errors.MySpringBootApplication.main(MySpringBootApplication.java:13) ~[classes/:na]
2020-11-03 14:38:46.400 WARN 11744 --- [[EventsPsQueue]] o.a.c.s.spi.TransactionErrorHandler : Transaction rollback (0x347b370c) redelivered(false) for (MessageId: queue_EventsPsQueue_ID_wsc-111-71a-44245-1603872340334-4_9_1_1_3 on ExchangeId: ID-wsc-111-71a-1604403526383-0-2) caught: java.io.IOException: Very bad exception
2020-11-03 14:38:46.401 INFO 11744 --- [[EventsPsQueue]] route2 : On completion child
2020-11-03 14:38:46.401 INFO 11744 --- [[EventsPsQueue]] route1 : Error handler parent. Body is: TEST MESSAGE
null
java.lang.NullPointerException
at org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy.aggregate(UseOriginalAggregationStrategy.java:62)
at org.apache.camel.AggregationStrategy.aggregate(AggregationStrategy.java:86)
at org.apache.camel.processor.MulticastProcessor.doAggregateInternal(MulticastProcessor.java:628)
at org.apache.camel.processor.MulticastProcessor.doAggregateSync(MulticastProcessor.java:609)
at org.apache.camel.processor.MulticastProcessor.doAggregate(MulticastProcessor.java:594)
at org.apache.camel.processor.MulticastProcessor$MulticastTask.aggregate(MulticastProcessor.java:413)
at org.apache.camel.processor.MulticastProcessor$MulticastTask.lambda$null$0(MulticastProcessor.java:393)
at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.schedule(DefaultReactiveExecutor.java:55)
at org.apache.camel.processor.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:186)
at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:129)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:287)
at org.apache.camel.processor.MulticastProcessor$MulticastTask.lambda$run$1(MulticastProcessor.java:367)
at org.apache.camel.util.concurrent.AsyncCompletionService$Task.run(AsyncCompletionService.java:150)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:65)
at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:273)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:723)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:632)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:65)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:181)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:164)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:261)
at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:90)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:65)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:145)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:723)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:632)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:65)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:181)
at org.apache.camel.spring.spi.TransactionErrorHandler.access$201(TransactionErrorHandler.java:45)
at org.apache.camel.spring.spi.TransactionErrorHandler$2.process(TransactionErrorHandler.java:238)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at org.apache.camel.spring.spi.TransactionErrorHandler.processByErrorHandler(TransactionErrorHandler.java:235)
at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:110)
at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:123)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:723)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:632)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:65)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:181)
at org.apache.camel.spring.spi.TransactionErrorHandler.access$201(TransactionErrorHandler.java:45)
at org.apache.camel.spring.spi.TransactionErrorHandler$2.process(TransactionErrorHandler.java:238)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at org.apache.camel.spring.spi.TransactionErrorHandler.processByErrorHandler(TransactionErrorHandler.java:235)
at org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:198)
at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.apache.camel.spring.spi.TransactionErrorHandler.doInTransactionTemplate(TransactionErrorHandler.java:191)
at org.apache.camel.spring.spi.TransactionErrorHandler.processInTransaction(TransactionErrorHandler.java:146)
at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:114)
at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:123)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:287)
at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:90)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:147)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:287)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:130)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2020-11-03 14:38:46.407 INFO 11744 --- [[EventsPsQueue]] route1 : On complete parent
2020-11-03 14:38:46.411 WARN 11744 --- [[EventsPsQueue]] c.c.j.DefaultJmsMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'EventsPsQueue' - trying to recover. Cause: Transaction rolled back because it has been marked as rollback-only
Run Code Online (Sandbox Code Playgroud)
没有事务支持的骆驼日志:
2020-11-03 14:40:27.748 INFO 11981 --- [ main] a.test.errors.MySpringBootApplication : Started MySpringBootApplication in 3.234 seconds (JVM running for 3.643)
2020-11-03 14:40:36.828 INFO 11981 --- [[EventsPsQueue]] route2 : First step
2020-11-03 14:40:36.832 INFO 11981 --- [[EventsPsQueue]] route2 : Error handling child
2020-11-03 14:40:36.837 ERROR 11981 --- [[EventsPsQueue]] o.a.c.p.e.DefaultErrorHandler : Failed delivery for (MessageId: queue_EventsPsQueue_ID_wsc-111-71a-44245-1603872340334-4_9_1_1_4 on ExchangeId: ID-wsc-111-71a-1604403636824-0-2). Exhausted after delivery attempt: 1 caught: java.io.IOException: Very bad exception. Processed by failure processor: FatalFallbackErrorHandler[Channel[log5]]
Message History (complete message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [from[jms://queue:EventsPsQueue] ] [ 24]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.io.IOException: Very bad exception
at activemq.test.errors.chapter1.MyRoute2.configure(MyRoute2.java:20) ~[classes/:na]
at org.apache.camel.builder.RouteBuilder.checkInitialized(RouteBuilder.java:483) ~[camel-core-engine-3.5.0.jar:3.5.0]
at org.apache.camel.builder.RouteBuilder.configureRoutes(RouteBuilder.java:430) ~[camel-core-engine-3.5.0.jar:3.5.0]
at org.apache.camel.builder.RouteBuilder.addRoutesToCamelContext(RouteBuilder.java:405) ~[camel-core-engine-3.5.0.jar:3.5.0]
at org.apache.camel.impl.engine.AbstractCamelContext.addRoutes(AbstractCamelContext.java:1185) ~[camel-base-3.5.0.jar:3.5.0]
at org.apache.camel.main.RoutesConfigurer.configureRoutes(RoutesConfigurer.java:93) ~[camel-main-3.5.0.jar:3.5.0]
at org.apache.camel.spring.boot.CamelSpringBootApplicationListener.onApplicationEvent(CamelSpringBootApplicationListener.java:101) ~[camel-spring-boot-3.5.0.jar:3.5.0]
at org.apache.camel.spring.boot.CamelSpringBootApplicationListener.onApplicationEvent(CamelSpringBootApplicationListener.java:57) ~[camel-spring-boot-3.5.0.jar:3.5.0]
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:404) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:361) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.su
对于 onException,还有另外两个可以设置为true或false 的值,即handled和continued。我认为这些向您解释了异常如何传播到父路由。
如果handled为true,那么抛出的异常将会被处理,Camel不会继续在原来的路由中进行路由,而是break out
在 Camel 2.3 中,我们引入了一个新选项 continue,它允许您在原始路由中处理和继续路由,就好像异常没有发生一样。
关于这一点transacted()我不知道。我正在阅读相关内容,如果我找到答案,会将您链接到该答案。