即使没有提交偏移量,Consumer.poll()也会返回新记录?

Gli*_*ide 9 apache-kafka kafka-consumer-api

如果我有一个enable.auto.commit=false和我打电话consumer.poll()而没有打电话consumer.commitAsync(),为什么consumer.poll()下次打电话时会返回新记录?

由于我没有提交我的偏移量,我希望poll()能够返回最新的偏移量,这应该是相同的记录.

我问,因为我在处理过程中试图处理故障情况.我希望不提交偏移量,poll()将再次返回相同的记录,以便我可以再次重新处理这些失败的记录.

public class MyConsumer implements Runnable {
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord record : records) {
                try {
                   //process record
                   consumer.commitAsync();
                } catch (Exception e) {
                }
                /**
                If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception. 
                **/
            }

        }
    }
}
Run Code Online (Sandbox Code Playgroud)

ftr*_*ftr 5

民意调查的起始偏移量不是由经纪人决定的,而是由消费者决定的。消费者跟踪下一个收到的偏移量,并在下一次轮询期间要求以下消息。

当使用者停止或发生故障并且另一个实例不知道上次使用的偏移量获取分区的消耗量时,偏移量提交就起作用了。

KafkaConsumer具有相当广泛的Javadoc,非常值得一读。

  • 你的意思是它只会在第一次调用`poll()`时才使用提交的偏移量? (3认同)
  • 说得通。但是[poll()doc](https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long))说:“上一次消耗的偏移量可以是手动通过seek(TopicPartition,long)设置,或自动设置为分区的已订阅列表的最后提交的偏移量”,后者不执行我的问题吗-将消耗的偏移量设置为最后的提交偏移量-应该如果我从不提交新的偏移量,则导致`poll()`不返回新记录。我的理解正确吗? (2认同)