Kafka使用者:如果未提交以前的消息,并且想要禁用自动提交,则想再次读取同一条消息

Rah*_*hul 3 apache-kafka kafka-consumer-api

我已关闭自动提交功能,并且在阅读后也未从消费方中提交偏移量。

受检查的消费者延迟也保持不变,这确保了偏移量不会被提交。但是问题是,它再次消耗了下一条味精消息。

我如何才能一次又一次地阅读相同的消息。只有提交了先前的偏移量后,我才应该能够阅读下一条消息。请在这里帮助我。

Sha*_*arg 9

如果您知道您的kafka用户当前正在访问哪个分区,则可以使用kafkaconsumer.seek(partition, offset)方法来告诉您的用户从特定偏移量读取消息。例:

//to get the partition from consumer record
val partition: Int = consumerRecord.partition() 

//to get offset of current record
val recordOffset = consumerRecord.offset() 

if(data processing fail condition)
  consumer.seek(new TopicPartition(topic, partition), recordOffset )

//will return records from recordOffset now, if data processing fail condition was true
consumer.poll(100) 
Run Code Online (Sandbox Code Playgroud)