看完之后,
我仍然对以下事情感到困惑,如果我错了,请纠正我.
非常感谢.:)
我正在做类似的事情
from(rabbitmq:pollingQueue?prefetchSize=1&concurrentConsumer=10)
.process(pollingRequestStatus) // check status of the request, if not ready, requeue = true
.Choice
.when(requeue == true) // request not ready
.to(rabbitmq:pollingQueue)//back to the same queue
.endChoice
.otherwise
.to(proceedToSomethingElse)
.endChoice.end;
Run Code Online (Sandbox Code Playgroud)
当重新排队发生时,消息会重复,这是将消息发回同一队列时的预期行为吗?
我也按照建议尝试了以下类似的东西,但它不起作用,该消息似乎只是被消耗掉了,不会重新排队
from(rabbitmq:pollingQueue? prefetchSize=1&concurrentConsumer=10)
.onException(NotReadyException.class)
.handled(true)
.setHeader(RabbitMQConstants.REQUEUE, constant(true))
.end()
.process(pollingRequestStatus) // check status of the request, if not ready, throw NotReadyEception
.to(proceedToSomethingElse);
Run Code Online (Sandbox Code Playgroud)
我尝试过的另外两种方法至少不会创建重复项,
1.) 在 NotReadyExeption 上,将消息发送回 pollingQueue
from(rabbitmq:pollingQueue? prefetchSize=1&concurrentConsumer=10)
.onException(NotReadyException.class)
.to(rabbitmq:pollingQueue)
//.delay(constant(8000)) //not sure why it throws error if i set delay
.end
.process(pollingRequestStatus); // check status …Run Code Online (Sandbox Code Playgroud)