小编ama*_*oqz的帖子

Kafka Consumer:引发异常时停止处理消息

我对 (Spring) Kafka 在停止ConcurrentMessageListenerContainer.

我想要实现的目标:在引发异常后停止消费者(例如消息无法保存到数据库),不提交偏移量,在给定时间后重新启动它并从先前失败的消息重新开始处理。

我读过这篇文章,它说容器将使用民意调查中的剩余记录(https://github.com/spring-projects/spring-kafka/issues/451)调用侦听器,这意味着不能保证之后失败的消息 成功处理的进一步消息将提交偏移量。这可能会导致消息丢失/跳过。

真的是这样吗,如果是,是否有解决方案可以在不升级较新版本的情况下解决此问题?(DLQ 不是我的情况的解决方案)

我已经做了什么:设置setErrorHandler()setAckOnError(false)

private Map<String, Object> getConsumerProps(CustomKafkaProps kafkaProps,  Class keyDeserializer) {
    Map<String, Object> props = new HashMap<>();
    //Set common props
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProps.getBootstrapServers());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProps.getConsumerGroupId());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start with the first message when a new consumer group (app) arrives at the topic
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // We will use "RECORD" AckMode in the Spring Listener Container

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);

    if (kafkaProps.isSslEnabled()) {
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        props.put("ssl.keystore.location", kafkaProps.getKafkaKeystoreLocation()); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api spring-kafka

6
推荐指数
1
解决办法
5249
查看次数