相关疑难解决方法(0)

seekToEnd 所有分区并在 Kafka 消费者的自动重新平衡中幸存下来

当消费者组 A 的 Kafka 消费者连接到 Kafka 代理时,我想寻找所有分区的末尾,即使在代理端存储了偏移量。如果更多额外的消费者连接到同一个消费者组,他们应该获取最新存储的偏移量。我正在做以下事情:

consumer.poll(timeout) 
consumer.seekToEnd(emptyList())

while(true) {
  val records = consumer.poll(timeout)
  if(records.isNotEmpty()) {
    //print records
    consumer.commitSync()
  }
}
Run Code Online (Sandbox Code Playgroud)

问题是当我连接消费者组 A 的第一个消费者 c1 时,一切都按预期工作,如果我连接消费者组 A 的额外消费者 c2,则该组正在重新平衡,并且 c1 将消耗跳过的偏移量。

有任何想法吗?

java kotlin apache-kafka

3
推荐指数
1
解决办法
3014
查看次数

标签 统计

apache-kafka ×1

java ×1

kotlin ×1