lai*_*man 1 duplicates apache-camel rabbitmq
我正在做类似的事情
from(rabbitmq:pollingQueue?prefetchSize=1&concurrentConsumer=10)
.process(pollingRequestStatus) // check status of the request, if not ready, requeue = true
.Choice
.when(requeue == true) // request not ready
.to(rabbitmq:pollingQueue)//back to the same queue
.endChoice
.otherwise
.to(proceedToSomethingElse)
.endChoice.end;
Run Code Online (Sandbox Code Playgroud)
当重新排队发生时,消息会重复,这是将消息发回同一队列时的预期行为吗?
我也按照建议尝试了以下类似的东西,但它不起作用,该消息似乎只是被消耗掉了,不会重新排队
from(rabbitmq:pollingQueue? prefetchSize=1&concurrentConsumer=10)
.onException(NotReadyException.class)
.handled(true)
.setHeader(RabbitMQConstants.REQUEUE, constant(true))
.end()
.process(pollingRequestStatus) // check status of the request, if not ready, throw NotReadyEception
.to(proceedToSomethingElse);
Run Code Online (Sandbox Code Playgroud)
我尝试过的另外两种方法至少不会创建重复项,
1.) 在 NotReadyExeption 上,将消息发送回 pollingQueue
from(rabbitmq:pollingQueue? prefetchSize=1&concurrentConsumer=10)
.onException(NotReadyException.class)
.to(rabbitmq:pollingQueue)
//.delay(constant(8000)) //not sure why it throws error if i set delay
.end
.process(pollingRequestStatus); // check status of the request, if not ready, throw NotReadyEception
Run Code Online (Sandbox Code Playgroud)
这有效,但是,它运行得太快,就像立即一样。如果我设置延迟(常量(数字)),则会引发以下错误,
Exception in thread "main" org.apache.camel.FailedToCreateRouteException: Failed to create route route13 at: >>> From [bla bla bla...]
at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:1062)
at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:196)
at org.apache.camel.impl.DefaultCamelContext.startRoute(DefaultCamelContext.java:984)
at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:3401)
at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3132)
at org.apache.camel.impl.DefaultCamelContext.access$000(DefaultCamelContext.java:183)
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:2961)
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:2957)
at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:2980)
at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:2957)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:2924)
at com.mbww.ithink.runner.Main.main(Main.java:174)
Caused by: java.lang.IllegalArgumentException: Route route13 has no output processors. You need to add outputs to the route such as to("log:foo").
at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:1060)
Run Code Online (Sandbox Code Playgroud)
2.) 在 NotReadyException 上,基于 redeliveryPolicy 重新投递
from(rabbitmq:pollingQueue? prefetchSize=1&concurrentConsumer=10)
.onException(NotReadyException.class)
.setFaultBody(constant(false))
.maximumRedeliveries(-1) // -1 = redeliver forever
.redeliveryDelay(10000)
.end
.process(pollingRequestStatus); // check status of the request, if not ready, throw NotReadyEception
Run Code Online (Sandbox Code Playgroud)
原来requeue的想法是,如果请求没有准备好,就将消息重新排队到队列的后面,设置一个延迟并检查下一个请求的状态,避免得到诸如Ratelimit错误之类的东西。似乎重新交付政策是现在要走的路。
谢谢
为了能够重新排队消息,您必须关闭 RabbitMQ 的自动确认。在这种情况下,您必须手动将ack
,nack
或reject
消息发送回发布者。( https://www.rabbitmq.com/confirms.html )
这意味着您必须在当前实现上手动调用basicAck
,basicNack
或basicReject
函数之一Channel
。
要关闭自动确认,请添加autoAck=false
端点参数。
AFAIK,Camel Endpoint 的底层 Channel 不可访问(source),因此您不能直接调用 Channel 的basicReject(long deliveryTag, boolean requeue)
函数,但 Camel 在交换失败时调用它(路由期间发生异常)。
解决方法可能如下:(伪代码,我没有尝试过,但基于检查camel-rabbitmq
端点的来源,尤其是这部分)
from("rabbitmq://localhost:5672/first?queue=test&concurrentConsumers=10prefetchSize=1&autoAck=false&autoDelete=false")
.onException(NotReadyException.class)
.log("Error for ${body}! Requeue")
.asyncDelayedRedelivery().redeliveryDelay(5000) // wait 5 secs to redeliver and requeue
.maximumRedeliveries(1)
.setHeader(RabbitMQConstants.REQUEUE, constant(true))
.handled(true)
.setFaultBody(constant(true))
.end()
.log("Received: ${body}")
.process((e) -> {
if(notReady(e))
throw new NotReadyException(); // create a new Exception and throw it if the status is not ready
}
})
.to("direct:somethingElse");
Run Code Online (Sandbox Code Playgroud)
我还创建了一个实现几乎相同场景的要点。希望能帮助到你!