spring kafka 偏移量增量,即使自动提交偏移量设置为 false

Tus*_*nne 7 java apache-kafka kafka-consumer-api spring-kafka

我正在尝试实施manual offset commit在 上收到的消息kafka。我已将偏移量提交设置为false,但偏移量值不断增加。

不知道是什么原因。需要帮助解决问题。

下面是代码

应用程序.yml

spring:
  application:
    name: kafka-consumer-sample
  resources:
    cache:
      period: 60m

kafka:
      bootstrapServers: localhost:9092
      options:
        enable:
          auto:
            commit: false
Run Code Online (Sandbox Code Playgroud)

KafkaConfig.java

@Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return new DefaultKafkaConsumerFactory<>(config);
    }

 @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
Run Code Online (Sandbox Code Playgroud)

KafkaConsumer.java

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}")
    public void consume(ConsumerRecord<String, String> record) {

        System.out.println("Consumed Kafka Record: " + record);
        record.timestampType();
        System.out.println("record.timestamp() = " + record.timestamp());
        System.out.println("***********************************");
        System.out.println(record.timestamp());
        System.out.println("record.key() = " + record.key());
        System.out.println("Consumed String Message : " + record.value());
    }
}
Run Code Online (Sandbox Code Playgroud)

输出如下

Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 31, CreateTime = 1573570989565, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 10)
record.timestamp() = 1573570989565
***********************************
1573570989565
record.key() = null
Consumed String Message : 10
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 32, CreateTime = 1573570991535, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 11)
record.timestamp() = 1573570991535
***********************************
1573570991535
record.key() = null
Consumed String Message : 11
Run Code Online (Sandbox Code Playgroud)

属性如下。

auto.commit.interval.ms = 100000000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mygroup
heartbeat.interval.ms = 3000
Run Code Online (Sandbox Code Playgroud)

这是我重新启动消费者之后的情况。我预计早期的数据也会被打印出来。

我的理解正确吗?请注意,我正在重新启动我的 springboot 应用程序,希望消息从第一个开始。我的卡夫卡服务器和动物园管理员没有终止。

Dea*_*ool 7

如果auto使用此属性禁用确认ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,那么您必须将容器级别的确认模式设置为MANUAL并且不要提交,offset因为默认情况下它设置为BATCH.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

因为当自动确认被禁用时,容器级确认被设置为BATCH

公共无效setAckMode(ContainerProperties.AckMode ackMode)

设置当自动确认(在配置属性中)为 false 时使用的确认模式。

  1. RECORD:每条记录传递给侦听器后进行确认。
  2. BATCH:从消费者收到的每批记录已传递给侦听器后进行确认
  3. TIME:在此毫秒数后确认;(应大于#setPollTimeout(long) pollTimeout。
  4. COUNT:至少收到此数量的记录后确认
  5. 手动:侦听器负责确认 - 使用 AcknowledgingMessageListener。

参数:

ackMode - ContainerProperties.AckMode;默认批次。

承诺抵消

提供了几个用于提交偏移量的选项。如果enable.auto.commit消费者属性为true,Kafka会根据其配置自动提交偏移量。如果为 false,则容器支持多种 AckMode 设置(在下一个列表中描述)。默认 AckMode 为 BATCH。从版本 2.3 开始,框架将enable.auto.commit 设置为 false,除非在配置中明确设置。以前,如果未设置该属性,则使用 Kafka 默认值 (true)。

如果您想始终从头开始阅读,则必须将此属性设置auto.offset.resetearliest

config.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest");
Run Code Online (Sandbox Code Playgroud)

注意:确保groupId必须是新的,并且在kafka中没有任何偏移量