Mat*_*nkt 4 java spring rabbitmq spring-amqp spring-retry
我正在尝试配置spring amqp只重试一次消息定义的次数.目前,例如由于a而失败的消息DataIntegrityViolationException 被无限期地重新传递.
@Bean
public StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor() {
return RetryInterceptorBuilder.stateful()
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.maxAttempts(3)
.messageKeyGenerator(message -> UUID.randomUUID().toString())
.build();
}
Run Code Online (Sandbox Code Playgroud)
这似乎没有应用 - 消息仍然无限期地尝试.
感觉就像我在这里遗漏了一些东西.
以下是关于AMQP的剩余配置:
@Bean
Queue testEventSubscriberQueue() {
final boolean durable = true;
return new Queue("testEventSubscriberQueue", durable);
}
@Bean
Binding binding(TopicExchange topicExchange) {
return BindingBuilder.bind(testEventSubscriberQueue()).to(topicExchange).with("payload.event-create");
}
@Bean
SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(testEventSubscriberQueue().getName());
container.setMessageListener(listenerAdapter);
container.setChannelTransacted(true);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(MessageConverter messageConverter, SubscriberHandler subscriberHandler) {
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(subscriberHandler);
listenerAdapter.setMessageConverter(messageConverter);
return listenerAdapter;
}
@Bean
public MessageConverter messageConverter(ObjectMapper objectMapper) {
final Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
jsonMessageConverter.setJsonObjectMapper(objectMapper);
DefaultClassMapper defaultClassMapper = new DefaultClassMapper();
defaultClassMapper.setDefaultType(EventPayload.class);
jsonMessageConverter.setClassMapper(defaultClassMapper);
final ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter(jsonMessageConverter);
messageConverter.addDelgate(MessageProperties.CONTENT_TYPE_JSON, jsonMessageConverter);
return messageConverter;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
//rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public TopicExchange testExchange() {
final boolean durable = true;
final boolean autoDelete = false;
return new TopicExchange(EXCHANGE_NAME, durable, autoDelete);
}
Run Code Online (Sandbox Code Playgroud)
我正在使用spring-amqp 1.5.1.RELEASE.
任何帮助表示赞赏.
您需要配置容器以将拦截器添加到其建议链中...
container.setAdviceChain(new Advice[] { statefulRetryOperationsInterceptor() });
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1119 次 |
| 最近记录: |