Are*_*s91 2 java apache-kafka kafka-consumer-api
由于可以从标题中邀请来宾,是否有一种方法可以获取有关Java中特定主题的使用者列表?直到现在,我仍然能够获得像这样的主题列表
final ListTopicsResult listTopicsResult = adminClient.listTopics();
KafkaFuture<Set<String>> kafkaFuture = listTopicsResult.names();
Set<String> map = kafkaFuture.get();
Run Code Online (Sandbox Code Playgroud)
但我还没有找到一种获取每个主题的消费者列表的方法
我最近正在为我的kafka客户端工具解决相同的问题。这并不容易,但是从代码中发现的唯一方法是:
Properties props = ...//here you put your properties
AdminClient kafkaClient = AdminClient.create(props);
//Here you get all the consumer groups
List<String> groupIds = kafkaClient.listConsumerGroups().all().get().
stream().map(s -> s.groupId()).collect(Collectors.toList());
//Here you get all the descriptions for the groups
Map<String, ConsumerGroupDescription> groups = kafkaClient.
describeConsumerGroups(groupIds).all().get();
for (final String groupId : groupIds) {
ConsumerGroupDescription descr = groups.get(groupId);
//find if any description is connected to the topic with topicName
Optional<TopicPartition> tp = descr.members().stream().
map(s -> s.assignment().topicPartitions()).
flatMap(coll -> coll.stream()).
filter(s -> s.topic().equals(topicName)).findAny();
if (tp.isPresent()) {
//you found the consumer, so collect the group id somewhere
}
}
Run Code Online (Sandbox Code Playgroud)
该API可从2.0版获得。也许有更好的方法,但是我找不到。您也可以在我的位桶上找到代码
小智 5
我知道这个主题有点旧,但我目前正在开发一个仪表板,需要列出相关消费者群体的主题。 Katya的回答是可以的,但是它仍然不会列出没有活跃成员的消费者组的TopicPartitions。我遇到了这个解决方案,希望它有所帮助:
Properties props = ...//here you put your properties
AdminClient kafkaClient = AdminClient.create(props);
Map<TopicPartition, OffsetAndMetadata> offsets = kafkaClient
.listConsumerGroupOffsets("consumer_group_name")
.partitionsToOffsetAndMetadata()
.get();
Run Code Online (Sandbox Code Playgroud)
偏移映射中的 TopicPartition 可以解析为特定主题。对于Katya 的回答中的整个 groupId 列表,可以重复此列表。
归档时间: |
|
查看次数: |
397 次 |
最近记录: |