kafka消费者动态检测添加的主题

sid*_*ddu 5 java apache-kafka kafka-consumer-api

我正在使用KafkaConsumer来消费来自Kafka服务器(主题)的消息.

  • 它适用于在启动消费者代码之前创建的主题...

但问题是,如果动态创建主题(我的意思是说消费者代码启动后),它将无法工作,但API表示它将支持动态主题创建..这是您的参考链接..

使用的Kafka版本:0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

这是JAVA代码......

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }
Run Code Online (Sandbox Code Playgroud)

注意:我的主题名称与正则表达式匹配..如果我重新启动消费者,那么它将开始阅读推送到主题的消息...

任何帮助真的很感激......

bhs*_*cer 13

在apache kafka邮件档案中有一个答案.我在下面复制它:

使用者支持配置选项"metadata.max.age.ms",它基本上控制提取主题元数据的频率.默认情况下,此设置相当高(5分钟),这意味着发现与正则表达式匹配的新主题最多需要5分钟.您可以将此值设置得更低,以便更快地发现主题.

所以在你的道具中你可以:

props.put("metadata.max.age.ms", 5000);
Run Code Online (Sandbox Code Playgroud)

这将使您的消费者每5秒钟了解一下新主题.


Dav*_*fin 4

您可以连接到 Zookeeper。查看示例代码。本质上,您将在 Zookeeper 节点上创建一个观察者/brokers/topics。当此处添加新的子项时,即添加了一个新主题,并且您的观察者将被触发。

请注意,这个答案与另一个答案之间的区别在于,这个答案是触发器,而另一个答案是轮询——这个答案将尽可能接近实时,另一个答案最多将在您的轮询间隔内。