Consumer.endOffsets 在 Kafka 中是如何工作的?

use*_*814 5 java apache-kafka kafka-consumer-api spring-kafka

假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出滞后、提交偏移和结束偏移。类似于 Kafka 控制台消费者组脚本的工作方式,但它适用于所有组。

就像是

单个消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)

Consumer consumer;

static {
  consumer = createConsumer();
}

run() { 
  List<String> groupIds = getConsumerGroups();
  for(String groupId: groupIds) {
       List<TopicParition> topicParitions =  getTopicParitions(groupId);
       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
   }
}
Run Code Online (Sandbox Code Playgroud)

多个消费者 - 工作

run() { 
   List<String> groupIds = getConsumerGroups();
   for(String groupId: groupIds) {
        List<TopicParition> topicParitions =  getTopicParitions(groupId);
        Consumer consumer = createConsumer();
        consumer.endOffsets(topicParitions); This works!!!
   }
 }
Run Code Online (Sandbox Code Playgroud)

版本:Kafka-Client 2.0.0

我是否错误地使用了消费者 API?理想情况下,我想使用单个消费者。

如果您需要更多详细信息,请告诉我。

use*_*814 0

这是Fetcher.fetchOffsetsByTimes()特定内部groupListOffsetRequests方法中的一个错误,其中逻辑没有添加分区以进行重试,而请求分区偏移量的领导者未知或不可用。

当您在所有消费者组分区中使用单个消费者时,这一点更加明显,其中一些组在我们请求时已经拥有主题分区领导者信息,endoffsets并且对于没有领导者信息的主题分区,由于错误而未知或不可用,因此被忽略。

后来,我意识到从每个消费者组中提取主题分区并不是一个好主意,而是进行更改以从中读取主题分区AdminClient.listTopics & AdminClient.describeTopics并将其全部传递到Consumer.endOffsets.

尽管这完全不能解决问题,因为主题/分区在多次运行之间可能仍然不可用或未知。

更多信息可以找到 - KAFKA-7044& pull request。此问题已修复并计划在 2.1.0 版本中发布。