Tim*_*thy 8 apache-kafka spring-kafka
在 Spring Boot 2.6.4 上,此方法已被弃用。
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
// deprecated
factory.setErrorHandler(new GlobalErrorHandler());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
全局错误处理程序类
public class GlobalErrorHandler implements ConsumerAwareErrorHandler {
private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
// my custom global logic (e.g. notify ops team via slack)
}
}
Run Code Online (Sandbox Code Playgroud)
这个的替代样本是什么?文档说我应该使用setCommonErrorHandler,但是如何实现该CommonErrorHandler接口,因为那里没有可以重写的方法。
要点是,我必须根据特定条件向运营团队发送松弛通知(消息 tpye,可在 kafka 消息头中找到)
这并不是阻塞,只是一条烦人的已弃用消息。谢谢
我面临着完全相同的问题,所以我改变了方法实现 ConsumerAwareErrorHandler
通用错误处理程序
并实施了
处理记录
就像文档中描述的那样,它有效!
public class GlobalErrorHandler implements CommonErrorHandler {
private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);
@Override
public void handleRecord(
Exception thrownException,
ConsumerRecord<?, ?> record,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
log.warn("Global error handler for message: {}", record.value().toString());
}
}
Run Code Online (Sandbox Code Playgroud)
在KafkaConfig.class中
@Bean(value = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory());
factory.setCommonErrorHandler(new GlobalErrorHandler());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
请参阅 Spring for Apache Kafka 文档;遗留错误处理程序被替换为CommonErrorHandler实现。
什么是新的?
https://docs.spring.io/spring-kafka/docs/current/reference/html/#x28-eh
旧版 GenericErrorHandler 及其用于记录批处理侦听器的子接口层次结构已被新的单一接口 CommonErrorHandler 取代,其实现与大多数旧版 GenericErrorHandler 实现相对应。有关更多信息,请参阅容器错误处理程序。
容器错误处理程序
https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers
从版本 2.8 开始,旧版本
ErrorHandler和BatchErrorHandler接口已被新的CommonErrorHandler. 这些错误处理程序可以处理记录侦听器和批处理侦听器的错误,从而允许单个侦听器容器工厂为这两种类型的侦听器创建容器。提供了 CommonErrorHandler 实现来替换大多数遗留框架错误处理程序实现,并且不推荐使用遗留错误处理程序。侦听器容器和侦听器容器工厂仍然支持旧接口;它们将在未来版本中被弃用。
| 归档时间: |
|
| 查看次数: |
12694 次 |
| 最近记录: |