use*_*814 5 java apache-kafka kafka-consumer-api spring-kafka
假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出滞后、提交偏移和结束偏移。类似于 Kafka 控制台消费者组脚本的工作方式,但它适用于所有组。
就像是
单个消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)
Consumer consumer;
static {
consumer = createConsumer();
}
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
}
}
Run Code Online (Sandbox Code Playgroud)
多个消费者 - 工作
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
Consumer consumer = createConsumer();
consumer.endOffsets(topicParitions); This works!!!
}
}
Run Code Online (Sandbox Code Playgroud)
版本:Kafka-Client 2.0.0
我是否错误地使用了消费者 API?理想情况下,我想使用单个消费者。
如果您需要更多详细信息,请告诉我。
这是Fetcher.fetchOffsetsByTimes()特定内部groupListOffsetRequests方法中的一个错误,其中逻辑没有添加分区以进行重试,而请求分区偏移量的领导者未知或不可用。
当您在所有消费者组分区中使用单个消费者时,这一点更加明显,其中一些组在我们请求时已经拥有主题分区领导者信息,endoffsets并且对于没有领导者信息的主题分区,由于错误而未知或不可用,因此被忽略。
后来,我意识到从每个消费者组中提取主题分区并不是一个好主意,而是进行更改以从中读取主题分区AdminClient.listTopics & AdminClient.describeTopics并将其全部传递到Consumer.endOffsets.
尽管这完全不能解决问题,因为主题/分区在多次运行之间可能仍然不可用或未知。
更多信息可以找到 - KAFKA-7044& pull request。此问题已修复并计划在 2.1.0 版本中发布。
| 归档时间: |
|
| 查看次数: |
2494 次 |
| 最近记录: |