Mar*_*ars 8 java spring spring-kafka
我正在尝试找到一种方法来使用新的 DefaultErrorHandler 而不是 spring-kafka 2.8.1 中已弃用的 SeekToCurrentErrorHandler ,以便在出现错误时覆盖重试默认行为。我想“停止”重试过程,因此如果发生错误,则不应重试。
现在,我在配置类中拥有以下可按预期工作的 bean:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new **SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L)**));
factory.setConsumerFactory(requestConsumerFactory());
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
由于在这个 spring kafka 版本中,STCEH 已被弃用,我尝试在同一个配置类中执行以下操作:
@Bean
public DefaultErrorHandler eh() {
return new DefaultErrorHandler(new FixedBackOff(0, 1));
}
Run Code Online (Sandbox Code Playgroud)
但似乎不起作用。如果出现错误,重试次数是默认的,正如我在日志中看到的:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 DefaultErrorHandler - 退避 FixBackOff{interval=0, currentAttempts=10, maxAttempts=9} 已耗尽 topicX
应该如何使用这个 DefaultErrorHandler 才能实现所需的行为?或者我应该使用其他东西?
提前谢谢!
factory.setCommonErrorHandler(new Default....)
bean的引导自动配置CommonErrorHandler
需要 Boot 2.6。
https://github.com/spring-projects/spring-boot/commit/c3583a4b06cff3f53b3322cd79f2b64d17211d0e
小智 5
工厂.setErrorHandler(新的SeekToCurrentErrorHandler(新的FixedBackOff(0L,1L))); 实际上,它最多会重试交付 1 次(2 次交付尝试)。(https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#seek-to-current)
默认重试次数是 **9(( (FixedBackOff(0L, 9L)) 而不是1 ( https://docs.spring.io/spring-kafka/docs/2.8.1/reference/html/#default-呃)
你应该尝试setCommonErrorHandler
而不是setErrorHandler
喜欢factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L));
归档时间: |
|
查看次数: |
15041 次 |
最近记录: |