查找分配给 Kafka 流实例的分区

Ash*_*yas 5 apache-kafka apache-kafka-streams

我有一个订阅许多主题的 Kafka 流应用程序,每个主题都有许多分区。当我创建应用程序拓扑并启动它时,我是否知道将哪些主题的哪些分区分配给我的应用程序的当前实例?我想知道这与此实例是否处理任何记录无关。

我知道当我得到一条记录时,我可以做到processorContext.partition()processorContext.topic()获取正在处理的当前记录的分区/主题信息。但我不是在寻找那个。

我正在寻找KafkaConsumer.assigmentkafka 流方面的等价物。

我也尝试了以下代码,但我得到 s 的大小为 0。

<Prepare builder and sconfig>
kafkaStream = new KafkaStreams (builder, sconfig);
kafkaStream.start ();
Collection<StreamsMetadata> s = kafkaStream.allMetadata();
System.out.println("StreamsMetadata: size is " + s.size());
for (StreamsMetadata m : s) {
    Set<TopicPartition> tp = m.topicPartitions();
    System.out.println  ("TopicPartition: " + tp.toString());
}
Run Code Online (Sandbox Code Playgroud)

Mic*_*oll 3

更新答案(2020 年 11 月):

当我创建应用程序拓扑并启动它时,我是否知道哪些主题的哪些分区分配给了我的应用程序的当前实例?

如果我理解正确的话,您可以按如下方式执行此操作。在您的应用程序实例中,用于KafkaStreams#localThreadsMetadata()获取ThreadMetadata(该应用程序实例的)所有本地流线程。ThreadMetadata包含TaskMetadata该流线程上的所有活动和备用任务。TaskMetadata有一个方法topicPartitions()来获取输入主题分区。

旧的、过时的答案:据我所知,Kafka Streams 中没有现有的 API 可以公开此信息。可以从 Kafka 消费者(由 Kafka Streams 使用)获取此信息,但它不会在 Kafka Streams 中公开。

  • 今天似乎有可能,我用说明更新了我的答案。 (2认同)