Erc*_*isa 7 apache-kafka spring-retry spring-kafka
我已经用KafkaHandler. 我的消费者应该消费事件,然后针对每个事件向其他服务发送 REST 请求。我只想在该 REST 服务关闭时重试。否则,我可以忽略失败的事件。
我的容器工厂配置如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate());
factory.setConcurrency(3);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
我ExceptionClassifierRetryPolicy用于设置异常和相应的重试策略。
重试后一切看起来都很好。当我得到一个时它会重试,当我得到一个时ConnectException它会忽略IllegalArgumentException。
然而,在IllegalArgumentException场景中,SeekToCurrentErrorHandler返回到未处理的偏移量(因为它寻找未处理的消息,包括失败的消息),最终立即重试失败的消息。消费者不断地来回并重试百万次。
如果我有机会了解哪个记录失败了SeekToCurrentErrorHandler,那么我将创建一个自定义实现SeekToCurrentErrorHandler来检查失败的消息是否可重试(通过使用该thrownException字段)。如果它不可重试,那么我会将它从列表中删除records以寻找回来。
关于如何实现此功能的任何想法?
注:enable.auto.commit设为false,auto.offset.reset设为earliest。
谢谢!
FailedRecordTrackerApache Kafka有一个since Spring 2.2(尚未发布):
从版本 2.2 开始,
SeekToCurrentErrorHandler现在可以恢复(跳过)不断失败的记录。默认情况下,10次失败后,将记录失败的记录(ERROR)。您可以使用自定义恢复程序 ( ) 和/或最大失败次数来配置处理程序BiConsumer。
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
// recover after 3 failures - e.g. send to a dead-letter topic
}, 3);
Run Code Online (Sandbox Code Playgroud)
因此,您只需将源代码复制/粘贴FailedRecordTracker到SeekToCurrentErrorHandler您master的项目中,您就会拥有您正在寻找的功能:
| 归档时间: |
|
| 查看次数: |
9247 次 |
| 最近记录: |