Apache Camel:RabbitMQ 将消息重新排队到同一队列会导致消息重复

lai*_*man 1 duplicates apache-camel rabbitmq

我正在做类似的事情

  from(rabbitmq:pollingQueue?prefetchSize=1&concurrentConsumer=10)
        .process(pollingRequestStatus) // check status of the request, if not ready, requeue = true
        .Choice
           .when(requeue == true) // request not ready
           .to(rabbitmq:pollingQueue)//back to the same queue
        .endChoice
        .otherwise
        .to(proceedToSomethingElse)
        .endChoice.end;
Run Code Online (Sandbox Code Playgroud)

当重新排队发生时,消息会重复,这是将消息发回同一队列时的预期行为吗?

我也按照建议尝试了以下类似的东西,但它不起作用,该消息似乎只是被消耗掉了,不会重新排队

from(rabbitmq:pollingQueue? prefetchSize=1&concurrentConsumer=10)
     .onException(NotReadyException.class)
     .handled(true)
     .setHeader(RabbitMQConstants.REQUEUE, constant(true))
     .end() 
     .process(pollingRequestStatus) // check status of the request, if not ready, throw NotReadyEception
        .to(proceedToSomethingElse);
Run Code Online (Sandbox Code Playgroud)

我尝试过的另外两种方法至少不会创建重复项,

1.) 在 NotReadyExeption 上,将消息发送回 pollingQueue

from(rabbitmq:pollingQueue? prefetchSize=1&concurrentConsumer=10)
    .onException(NotReadyException.class)
    .to(rabbitmq:pollingQueue)
    //.delay(constant(8000)) //not sure why it throws error if i set delay
    .end
    .process(pollingRequestStatus); // check status of the request, if not ready, throw NotReadyEception
Run Code Online (Sandbox Code Playgroud)

这有效,但是,它运行得太快,就像立即一样。如果我设置延迟(常量(数字)),则会引发以下错误,

Exception in thread "main" org.apache.camel.FailedToCreateRouteException: Failed to create route route13 at: >>> From [bla bla bla...]
    at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:1062)
    at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:196)
    at org.apache.camel.impl.DefaultCamelContext.startRoute(DefaultCamelContext.java:984)
    at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:3401)
    at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3132)
    at org.apache.camel.impl.DefaultCamelContext.access$000(DefaultCamelContext.java:183)
    at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:2961)
    at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:2957)
    at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:2980)
    at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:2957)
    at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
    at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:2924)
    at com.mbww.ithink.runner.Main.main(Main.java:174)
Caused by: java.lang.IllegalArgumentException: Route route13 has no output processors. You need to add outputs to the route such as to("log:foo").
    at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:1060)
Run Code Online (Sandbox Code Playgroud)

2.) 在 NotReadyException 上,基于 redeliveryPolicy 重新投递

from(rabbitmq:pollingQueue? prefetchSize=1&concurrentConsumer=10)
    .onException(NotReadyException.class)
    .setFaultBody(constant(false))
    .maximumRedeliveries(-1) // -1 = redeliver forever
    .redeliveryDelay(10000)
    .end
    .process(pollingRequestStatus); // check status of the request, if not ready, throw NotReadyEception
Run Code Online (Sandbox Code Playgroud)

原来requeue的想法是,如果请求没有准备好,就将消息重新排队到队列的后面,设置一个延迟并检查下一个请求的状态,避免得到诸如Ratelimit错误之类的东西。似乎重新交付政策是现在要走的路。

谢谢

mgy*_*osi 5

为了能够重新排队消息,您必须关闭 RabbitMQ 的自动确认。在这种情况下,您必须手动将ack,nackreject消息发送回发布者。( https://www.rabbitmq.com/confirms.html )

这意味着您必须在当前实现上手动调用basicAck,basicNackbasicReject函数之一Channel


翻译成骆驼:

要关闭自动确认,请添加autoAck=false端点参数。

AFAIK,Camel Endpoint 的底层 Channel 不可访问(source),因此您不能直接调用 Channel 的basicReject(long deliveryTag, boolean requeue)函数,但 Camel 在交换失败时调用它(路由期间发生异常)。

解决方法可能如下:(伪代码,我没有尝试过,但基于检查camel-rabbitmq端点的来源,尤其是部分)

更新的解决方法(经过测试并有效):

from("rabbitmq://localhost:5672/first?queue=test&concurrentConsumers=10prefetchSize=1&autoAck=false&autoDelete=false")
            .onException(NotReadyException.class)
                .log("Error for ${body}! Requeue")
                .asyncDelayedRedelivery().redeliveryDelay(5000) // wait 5 secs to redeliver and requeue
                .maximumRedeliveries(1)
                .setHeader(RabbitMQConstants.REQUEUE, constant(true))
                .handled(true)
                .setFaultBody(constant(true))
            .end()
            .log("Received: ${body}")
            .process((e) -> {
                if(notReady(e))
                    throw new NotReadyException(); // create a new Exception and throw it if the status is not ready
                }
            })
            .to("direct:somethingElse");
Run Code Online (Sandbox Code Playgroud)

我还创建了一个实现几乎相同场景的要点。希望能帮助到你!