spring.kafka.consumer.auto-offset-reset 在 spring-kafka 中如何工作

gst*_*low 5 java spring apache-kafka kafka-producer-api spring-kafka

KafkaPropertiesjava 文档:

/**
  * What to do when there is no initial offset in Kafka or if the current offset
  * does not exist any more on the server.
  */
private String autoOffsetReset;
Run Code Online (Sandbox Code Playgroud)

我有你好世界应用程序,其中包含application.properties

spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=latest
Run Code Online (Sandbox Code Playgroud)

在这种情况下,@KafkaListener将为所有条目调用方法。但预期结果是@KafkaListener仅针对我发送的最新 3 个选项调用该方法。我尝试使用另一个选项:

spring.kafka.consumer.auto-offset-reset=earlisest
Run Code Online (Sandbox Code Playgroud)

但行为是一样的。

你能解释一下这个东西吗?

聚苯乙烯

代码示例:

@SpringBootApplication
public class Application implements CommandLineRunner {

    public static Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    private final CountDownLatch latch = new CountDownLatch(3);

    @Override
    public void run(String... args) throws Exception {
        this.template.send("spring_kafka_topic", "foo1");
        this.template.send("spring_kafka_topic", "foo2");
        this.template.send("spring_kafka_topic", "foo3");
        latch.await(60, TimeUnit.SECONDS);
        logger.info("All received");
    }

    @KafkaListener(topics = "spring_kafka_topic")
    public void listen(ConsumerRecord<?, ?> cr) throws Exception {
        logger.info(cr.toString());
        latch.countDown();
    }
}
Run Code Online (Sandbox Code Playgroud)

更新:

行为不取决于
spring.kafka.consumer.auto-offset-reset

它仅取决于 spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.enable-auto-commit

如果我设置spring.kafka.consumer.enable-auto-commit=false- 我会看到所有记录。

如果我设置spring.kafka.consumer.enable-auto-commit=true- 我只看到最后 3 条记录。

请澄清spring.kafka.consumer.auto-offset-reset财产的含义

Art*_*lan 2

Spring Boot中KafkaProperties是这样做的:

public Map<String, Object> buildProperties() {
        Map<String, Object> properties = new HashMap<String, Object>();
        if (this.autoCommitInterval != null) {
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
                    this.autoCommitInterval);
        }
        if (this.autoOffsetReset != null) {
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                    this.autoOffsetReset);
        }
Run Code Online (Sandbox Code Playgroud)

buildProperties()是从buildConsumerProperties()which中使用的,依次在:

@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
    return new DefaultKafkaConsumerFactory<Object, Object>(
            this.properties.buildConsumerProperties());
}
Run Code Online (Sandbox Code Playgroud)

因此,如果您使用自己的ConsumerFactorybean 定义,请务必重用它们KafkaPropertieshttps://docs.spring.io/spring-boot/docs/1.5.7.RELEASE/reference/htmlsingle/#boot-features-kafka-extra-道具

更新

好的。我明白发生了什么事。

尝试添加此属性:

spring.kafka.consumer.enable-auto-commit=false
Run Code Online (Sandbox Code Playgroud)

这样我们就不会根据某个提交间隔进行异步自动提交。

我们应用程序中的逻辑基于latch.await(60, TimeUnit.SECONDS);. 当我们得到3预期的记录时,我们退出。这样,来自消费者的异步自动提交可能还不会发生。因此,下次运行应用程序时,消费者将从未提交的偏移量中轮询数据。

当我们关闭自动提交时,我们有一个AckMode.BATCH,它是同步执行的,并且我们能够看到该foo消费者组的主题中真正最新的记录。