如何在Spring异步MessageListener用例中发生业务异常时请求RabbitMQ重试

San*_*osh 16 java spring rabbitmq

我有一个Spring AMQP消息监听器正在运行.

public class ConsumerService implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        try {
            testService.process(message); //This process method can throw Business Exception
        } catch (BusinessException e) {
           //Here we can just log the exception. How the retry attempt is made?
        } catch (Exception e) {
           //Here we can just log the exception.  How the retry attempt is made?
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

如您所见,在处理过程中可能会出现异常.我想重试因为Catch块中的特定错误.我不能通过onMessage中的异常.如何告诉RabbitMQ有异常并重试?

Naz*_* K. 29

由于onMessage()不允许抛出已检查的异常,因此可以将异常包装在a中RuntimeException并重新抛出它.

try {
    testService.process(message);
} catch (BusinessException e) {
    throw new RuntimeException(e);
}
Run Code Online (Sandbox Code Playgroud)

但请注意,这可能会导致消息无限期地重新传递.这是如何工作的:

RabbitMQ支持拒绝消息并要求代理重新排队.这在这里显示.但RabbitMQ本身并没有重试策略的机制,例如设置最大重试次数,延迟等.

使用Spring AMQP时,"拒绝重新排队"是默认选项.SimpleMessageListenerContainer默认情况下,Spring 会在存在未处理的异常时执行此操作.因此,在您的情况下,您只需要重新抛出捕获的异常.但请注意,如果您无法处理消息并且始终抛出异常,则会无限期地重新传递该消息并导致无限循环.

您可以通过抛出AmqpRejectAndDontRequeueException异常来覆盖每个消息的此行为,在这种情况下,消息不会被重新排队.

您还可以SimpleMessageListenerContainer通过设置完全关闭"拒绝重新排队"行为

container.setDefaultRequeueRejected(false) 
Run Code Online (Sandbox Code Playgroud)

当消息被拒绝且没有被重新排队时,如果在RabbitMQ中设置了一个消息,它将丢失或传送到DLQ.

如果您需要最大的尝试,延迟,重试政策等最简单的就是建立一个春天"无状态" RetryOperationsInterceptor,这将完成所有的重试线程(使用中Thread.sleep()),而每个重试(拒绝消息,以便不用回到RabbitMQ的每个重试).重试耗尽时,默认情况下会记录一条警告并消耗该消息.如果要发送到DLQ,您将需要一个RepublishMessageRecoverer或一个自定义MessageRecoverer拒绝该消息而不重新排队(在后一种情况下,您还应该在队列上设置 RabbitMQ DLQ).默认消息恢复器的示例:

container.setAdviceChain(new Advice[] {
        org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
                .stateless()
                .maxAttempts(5)
                .backOffOptions(1000, 2, 5000)
                .build()
});
Run Code Online (Sandbox Code Playgroud)

这显然有一个缺点,即您将在整个重试期间占用线程.您还可以使用"状态"选项RetryOperationsInterceptor,这将消息发回的RabbitMQ每个重试,但延迟仍然会与执行Thread.sleep()的应用程序中,再加上建立一个有状态的拦截器是一个比较复杂一点.

因此,如果您希望重试延迟而不占用Thread,则需要在RabbitMQ队列上使用TTL进行更复杂的自定义解决方案.如果你不想要指数退避(所以延迟不会在每次重试时增加),它会更简单一些.要实现这样的解决方案,您基本上在rabbitMQ上使用参数创建另一个队列:"x-message-ttl": <delay time in milliseconds>"x-dead-letter-exchange":"<name of the original queue>".然后在您设置的主队列上"x-dead-letter-exchange":"<name of the queue with the TTL>".所以现在当你拒绝并且不重新排队消息时,RabbitMQ会将其重定向到第二个队列.当TTL过期时,它将被重定向到原始队列,从而重新传送到应用程序.所以现在你需要一个重试拦截器,在每次失败后拒绝给RabbitMQ的消息,并跟踪重试次数.为了避免在应用程序中保持状态的需要(因为如果您的应用程序是群集的,您需要复制状态),您可以从x-deathRabbitMQ设置的标头计算重试计数.在此处查看有关此标题的更多信息.因此,在这一点上实现自定义拦截器比使用此行为自定义Spring状态拦截器更容易.

另请参阅Spring AMQP参考中有关重试的部分.