Spring kafka setErrorHandler 已弃用替换(引导 2.6.4)

Tim*_*thy 8 apache-kafka spring-kafka

在 Spring Boot 2.6.4 上,此方法已被弃用。

public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, consumerFactory());

        // deprecated
        factory.setErrorHandler(new GlobalErrorHandler());

        return factory;
    }
Run Code Online (Sandbox Code Playgroud)

全局错误处理程序类

public class GlobalErrorHandler implements ConsumerAwareErrorHandler {

    private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
        // my custom global logic (e.g. notify ops team via slack)
    }

}
Run Code Online (Sandbox Code Playgroud)

这个的替代样本是什么?文档说我应该使用setCommonErrorHandler,但是如何实现该CommonErrorHandler接口,因为那里没有可以重写的方法。

要点是,我必须根据特定条件向运营团队发送松弛通知(消息 tpye,可在 kafka 消息头中找到)

这并不是阻塞,只是一条烦人的已弃用消息。谢谢

Mat*_*las 7

我面临着完全相同的问题,所以我改变了方法实现 ConsumerAwareErrorHandler

通用错误处理程序

并实施了

处理记录

就像文档中描述的那样,它有效!

public class GlobalErrorHandler implements CommonErrorHandler {

  private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

  @Override
  public void handleRecord(
      Exception thrownException,
      ConsumerRecord<?, ?> record,
      Consumer<?, ?> consumer,
      MessageListenerContainer container) {
    log.warn("Global error handler for message: {}", record.value().toString());
  }
}
Run Code Online (Sandbox Code Playgroud)

在KafkaConfig.class中

  @Bean(value = "kafkaListenerContainerFactory")
  public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
    var factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, consumerFactory());

    factory.setCommonErrorHandler(new GlobalErrorHandler());

    return factory;
  }
Run Code Online (Sandbox Code Playgroud)


Gar*_*ell 5

请参阅 Spring for Apache Kafka 文档;遗留错误处理程序被替换为CommonErrorHandler实现。

什么是新的?

https://docs.spring.io/spring-kafka/docs/current/reference/html/#x28-eh

旧版 GenericErrorHandler 及其用于记录批处理侦听器的子接口层次结构已被新的单一接口 CommonErrorHandler 取代,其实现与大多数旧版 GenericErrorHandler 实现相对应。有关更多信息,请参阅容器错误处理程序。

容器错误处理程序

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers

从版本 2.8 开始,旧版本ErrorHandlerBatchErrorHandler接口已被新的CommonErrorHandler. 这些错误处理程序可以处理记录侦听器和批处理侦听器的错误,从而允许单个侦听器容器工厂为这两种类型的侦听器创建容器。提供了 CommonErrorHandler 实现来替换大多数遗留框架错误处理程序实现,并且不推荐使用遗留错误处理程序。侦听器容器和侦听器容器工厂仍然支持旧接口;它们将在未来版本中被弃用。