有没有办法在特定的偏移量处停止 Kafka 消费者?

Luc*_*ess 5 apache-kafka

我可以寻求特定的偏移量。有没有办法在特定的偏移量处停止消费者?换句话说,消耗直到我给定的偏移量。据我所知,Kafka 没有提供这样的功能。如果我错了,请纠正我。

例如。分区的偏移量为 1-10。我只想从 3-8 消费。消费完第 8 条消息后,程序应该退出。

Gua*_*Zuo 2

是的,kafka 不提供此功能,但您可以在您的消费者代码中实现此功能。你可以尝试使用commitSync()来控制这个。

公共无效commitSync(地图偏移量)

提交指定主题和分区列表的指定偏移量。这会将偏移量提交给 Kafka。使用此 API 提交的偏移量将在每次重新平衡后的第一次提取以及启动时使用。因此,如果您需要在 Kafka 以外的任何地方存储偏移量,则不应使用此 API。提交的偏移量应该是您的应用程序将使用的下一条消息,即lastProcessedMessageOffset + 1。

这是同步提交,并且将阻塞,直到提交成功或遇到不可恢复的错误(在这种情况下,它被抛出给调用者)。

像这样的东西:

 while (goAhead) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         if (record.offset() > OFFSET_BOUND) {
            consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())));
            goAhead = false;
            break;           
         }
         process(record);
     }
 }
Run Code Online (Sandbox Code Playgroud)

您应该在上面的代码中将“enable.auto.commit”设置为 false。在您的情况下,OFFSET_BOUND 可以设置为 8。因为在您的示例中提交的偏移量仅为 9,所以下次消费者将从该位置获取。