Spring Kafka SeekToCurrentErrorHandler 找出失败的记录

Erc*_*isa 7 apache-kafka spring-retry spring-kafka

我已经用KafkaHandler. 我的消费者应该消费事件,然后针对每个事件向其他服务发送 REST 请求。我只想在该 REST 服务关闭时重试。否则,我可以忽略失败的事件。

我的容器工厂配置如下:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
  kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
    new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());
  factory.setStatefulRetry(true);
  factory.setRetryTemplate(retryTemplate());
  factory.setConcurrency(3);

  ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckOnError(false);
  containerProperties.setAckMode(AckMode.RECORD);
  containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());

  return factory;
}
Run Code Online (Sandbox Code Playgroud)

ExceptionClassifierRetryPolicy用于设置异常和相应的重试策略。

重试后一切看起来都很好。当我得到一个时它会重试,当我得到一个时ConnectException它会忽略IllegalArgumentException

然而,在IllegalArgumentException场景中,SeekToCurrentErrorHandler返回到未处理的偏移量(因为它寻找未处理的消息,包括失败的消息),最终立即重试失败的消息。消费者不断地来回并重试百万次。

如果我有机会了解哪个记录失败了SeekToCurrentErrorHandler,那么我将创建一个自定义实现SeekToCurrentErrorHandler来检查失败的消息是否可重试(通过使用该thrownException字段)。如果它不可重试,那么我会将它从列表中删除records以寻找回来。

关于如何实现此功能的任何想法?

注:enable.auto.commit设为falseauto.offset.reset设为earliest

谢谢!

Art*_*lan 4

FailedRecordTrackerApache Kafka有一个since Spring 2.2(尚未发布):

https://docs.spring.io/spring-kafka/docs/2.2.0.M2/reference/html/whats-new-part.html#_listener_container_changes

从版本 2.2 开始,SeekToCurrentErrorHandler现在可以恢复(跳过)不断失败的记录。默认情况下,10次失败后,将记录失败的记录(ERROR)。您可以使用自定义恢复程序 ( ) 和/或最大失败次数来配置处理程序BiConsumer

SeekToCurrentErrorHandler errorHandler =
    new SeekToCurrentErrorHandler((record, exception) -> {
          // recover after 3 failures - e.g. send to a dead-letter topic
          }, 3);
Run Code Online (Sandbox Code Playgroud)

因此,您只需将源代码复制/粘贴FailedRecordTrackerSeekToCurrentErrorHandlermaster的项目中,您就会拥有您正在寻找的功能:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java