Gli*_*ide 9 apache-kafka kafka-consumer-api
如果我有一个enable.auto.commit=false和我打电话consumer.poll()而没有打电话consumer.commitAsync(),为什么consumer.poll()下次打电话时会返回新记录?
由于我没有提交我的偏移量,我希望poll()能够返回最新的偏移量,这应该是相同的记录.
我问,因为我在处理过程中试图处理故障情况.我希望不提交偏移量,poll()将再次返回相同的记录,以便我可以再次重新处理这些失败的记录.
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record : records) {
try {
//process record
consumer.commitAsync();
} catch (Exception e) {
}
/**
If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception.
**/
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
民意调查的起始偏移量不是由经纪人决定的,而是由消费者决定的。消费者跟踪下一个收到的偏移量,并在下一次轮询期间要求以下消息。
当使用者停止或发生故障并且另一个实例不知道上次使用的偏移量获取分区的消耗量时,偏移量提交就起作用了。
KafkaConsumer具有相当广泛的Javadoc,非常值得一读。