San*_*osh 16 java spring rabbitmq
我有一个Spring AMQP消息监听器正在运行.
public class ConsumerService implements MessageListener {
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void onMessage(Message message) {
try {
testService.process(message); //This process method can throw Business Exception
} catch (BusinessException e) {
//Here we can just log the exception. How the retry attempt is made?
} catch (Exception e) {
//Here we can just log the exception. How the retry attempt is made?
}
}
}
Run Code Online (Sandbox Code Playgroud)
如您所见,在处理过程中可能会出现异常.我想重试因为Catch块中的特定错误.我不能通过onMessage中的异常.如何告诉RabbitMQ有异常并重试?
Naz*_* K. 29
由于onMessage()
不允许抛出已检查的异常,因此可以将异常包装在a中RuntimeException
并重新抛出它.
try {
testService.process(message);
} catch (BusinessException e) {
throw new RuntimeException(e);
}
Run Code Online (Sandbox Code Playgroud)
但请注意,这可能会导致消息无限期地重新传递.这是如何工作的:
RabbitMQ支持拒绝消息并要求代理重新排队.这在这里显示.但RabbitMQ本身并没有重试策略的机制,例如设置最大重试次数,延迟等.
使用Spring AMQP时,"拒绝重新排队"是默认选项.SimpleMessageListenerContainer
默认情况下,Spring 会在存在未处理的异常时执行此操作.因此,在您的情况下,您只需要重新抛出捕获的异常.但请注意,如果您无法处理消息并且始终抛出异常,则会无限期地重新传递该消息并导致无限循环.
您可以通过抛出AmqpRejectAndDontRequeueException
异常来覆盖每个消息的此行为,在这种情况下,消息不会被重新排队.
您还可以SimpleMessageListenerContainer
通过设置完全关闭"拒绝重新排队"行为
container.setDefaultRequeueRejected(false)
Run Code Online (Sandbox Code Playgroud)
当消息被拒绝且没有被重新排队时,如果在RabbitMQ中设置了一个消息,它将丢失或传送到DLQ.
如果您需要最大的尝试,延迟,重试政策等最简单的就是建立一个春天"无状态" RetryOperationsInterceptor
,这将完成所有的重试线程(使用中Thread.sleep()
),而每个重试(拒绝消息,以便不用回到RabbitMQ的每个重试).重试耗尽时,默认情况下会记录一条警告并消耗该消息.如果要发送到DLQ,您将需要一个RepublishMessageRecoverer
或一个自定义MessageRecoverer
拒绝该消息而不重新排队(在后一种情况下,您还应该在队列上设置 RabbitMQ DLQ).默认消息恢复器的示例:
container.setAdviceChain(new Advice[] {
org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
.stateless()
.maxAttempts(5)
.backOffOptions(1000, 2, 5000)
.build()
});
Run Code Online (Sandbox Code Playgroud)
这显然有一个缺点,即您将在整个重试期间占用线程.您还可以使用"状态"选项RetryOperationsInterceptor
,这将消息发回的RabbitMQ每个重试,但延迟仍然会与执行Thread.sleep()
的应用程序中,再加上建立一个有状态的拦截器是一个比较复杂一点.
因此,如果您希望重试延迟而不占用Thread
,则需要在RabbitMQ队列上使用TTL进行更复杂的自定义解决方案.如果你不想要指数退避(所以延迟不会在每次重试时增加),它会更简单一些.要实现这样的解决方案,您基本上在rabbitMQ上使用参数创建另一个队列:"x-message-ttl": <delay time in milliseconds>
和"x-dead-letter-exchange":"<name of the original queue>"
.然后在您设置的主队列上"x-dead-letter-exchange":"<name of the queue with the TTL>"
.所以现在当你拒绝并且不重新排队消息时,RabbitMQ会将其重定向到第二个队列.当TTL过期时,它将被重定向到原始队列,从而重新传送到应用程序.所以现在你需要一个重试拦截器,在每次失败后拒绝给RabbitMQ的消息,并跟踪重试次数.为了避免在应用程序中保持状态的需要(因为如果您的应用程序是群集的,您需要复制状态),您可以从x-death
RabbitMQ设置的标头计算重试计数.在此处查看有关此标题的更多信息.因此,在这一点上实现自定义拦截器比使用此行为自定义Spring状态拦截器更容易.
归档时间: |
|
查看次数: |
9052 次 |
最近记录: |