如何用 DefaultErrorHandler 替换已弃用的 SeekToCurrentErrorHandler (spring-kafka)?

Mar*_*ars 8 java spring spring-kafka

我正在尝试找到一种方法来使用新的 DefaultErrorHandler 而不是 spring-kafka 2.8.1 中已弃用的 SeekToCurrentErrorHandler ,以便在出现错误时覆盖重试默认行为。我想“停止”重试过程,因此如果发生错误,则不应重试。

现在,我在配置类中拥有以下可按预期工作的 bean:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new **SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L)**));
    factory.setConsumerFactory(requestConsumerFactory());
    factory.setReplyTemplate(kafkaTemplate());
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

由于在这个 spring kafka 版本中,STCEH 已被弃用,我尝试在同一个配置类中执行以下操作:

@Bean
public DefaultErrorHandler eh() {
    return new DefaultErrorHandler(new FixedBackOff(0, 1));
}
Run Code Online (Sandbox Code Playgroud)

但似乎不起作用。如果出现错误,重试次数是默认的,正如我在日志中看到的:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 DefaultErrorHandler - 退避 FixBackOff{interval=0, currentAttempts=10, maxAttempts=9} 已耗尽 topicX

应该如何使用这个 DefaultErrorHandler 才能实现所需的行为?或者我应该使用其他东西?

提前谢谢!

Gar*_*ell 8

factory.setCommonErrorHandler(new Default....)

bean的引导自动配置CommonErrorHandler需要 Boot 2.6。

https://github.com/spring-projects/spring-boot/commit/c3583a4b06cff3f53b3322cd79f2b64d17211d0e

  • @geneb。为 Kafka 创建整个 Spring 模块还不够?您还需要*直接*复制/粘贴吗?快点 (2认同)

小智 5

  1. 工厂.setErrorHandler(新的SeekToCurrentErrorHandler(新的FixedBackOff(0L,1L))); 实际上,它最多会重试交付 1 次(2 次交付尝试)。(https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#seek-to-current

  2. 默认重试次数是 **9(( (FixedBackOff(0L, 9L)) 而不是1 ( https://docs.spring.io/spring-kafka/docs/2.8.1/reference/html/#default-呃

  3. 你应该尝试setCommonErrorHandler而不是setErrorHandler喜欢factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L));