Spring Boot Kafka 配置 DefaultErrorHandler?

shj*_*shj 4 spring apache-kafka spring-boot spring-kafka

我按照 Spring Kafka 文档创建了一个批处理消费者:

@SpringBootApplication
public class ApplicationConsumer {
  private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationConsumer.class);
  private static final String TOPIC = "foo";

  public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(ApplicationConsumer.class, args);
  }

  @Bean
  public RecordMessageConverter converter() {
    return new JsonMessageConverter();
  }

  @Bean
  public BatchMessagingMessageConverter batchConverter() {
    return new BatchMessagingMessageConverter(converter());
  }

  @KafkaListener(topics = TOPIC)
  public void listen(List<Name> ps) {
    LOGGER.info("received name beans: {}", Arrays.toString(ps.toArray()));
  }
}
Run Code Online (Sandbox Code Playgroud)

我能够通过定义 Spring 自动选取的以下附加配置环境变量来成功让使用者运行:

export SPRING_KAFKA_BOOTSTRAP-SERVERS=...
export SPRING_KAFKA_CONSUMER_GROUP-ID=...
Run Code Online (Sandbox Code Playgroud)

所以上面的代码有效。但现在我想自定义默认错误处理程序以使用指数退避。从参考文档中,我尝试将以下内容添加到 ApplicationConsumer 类中:

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setCommonErrorHandler(new DefaultErrorHandler(new ExponentialBackOffWithMaxRetries(10)));
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    return props;
}
Run Code Online (Sandbox Code Playgroud)

但现在我收到错误消息,说找不到某些配置。看起来我不得不重新定义之前已经自动定义的consumerConfigs() 中的所有属性。这包括从引导服务器 uri 到 json 反序列化配置的所有内容。

有没有一种好方法可以更新我的第一个版本的代码以覆盖默认错误处理程序?

Gar*_*ell 7

只需将错误处理程序定义为 a @Bean,Boot 就会自动将其连接到其自动配置的容器工厂中。

编辑

这对我来说是预期的:

@SpringBootApplication
public class So70884203Application {

    public static void main(String[] args) {
        SpringApplication.run(So70884203Application.class, args);
    }

    @Bean
    DefaultErrorHandler eh() {
        return new DefaultErrorHandler((rec, ex) -> {
            System.out.println("Recovered: " + rec);
        }, new FixedBackOff(0L, 0L));
    }

    @KafkaListener(id = "so70884203", topics = "so70884203")
    void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so70884203").partitions(1).replicas(1).build();
    }

}
Run Code Online (Sandbox Code Playgroud)
foo
Recovered: ConsumerRecord(topic = so70884203, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1643316625291, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)
Run Code Online (Sandbox Code Playgroud)