Java,如何在apache kafka中获取主题中的消息数量

Che*_*tan 70 java messages apache-kafka

我正在使用apache kafka进行消息传递.我用Java实现了生产者和使用者.我们如何获取主题中的消息数量?

sse*_*hev 79

它不是java,但可能有用

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 
  | awk -F  ":" '{sum += $3} END {print sum}'
Run Code Online (Sandbox Code Playgroud)

  • 这不应该是每个分区总和的最早和最新偏移的差异吗?`bash-4.3#$ KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F":"'{sum + = $ 3} END {print sum}'13818663 bash-4.3#$ KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95: 32774 - topic test-topic --time -2 | awk -F":"'{sum + = $ 3} END {print sum}'12434609`然后差异返回主题中的实际待定消息?我对么? (7认同)

Lun*_*ahl 25

从消费者的角度来看,唯一可以想到的方法是实际使用消息并对消息进行计数.

Kafka代理公开JMX计数器,显示自启动以来收到的消息数量,但您无法知道已经清除了多少消息.

在大多数常见情况下,Kafka中的消息最好被视为无限流,并且获得当前保留在磁盘上的数量的离散值是不相关的.此外,当处理一个代理集群时,事情会变得更加复杂,这些代理在一个主题中都有一部分消息.

  • 请参阅我的回答 /sf/answers/3311970441/。Java Kafka 客户端允许获取该信息。 (2认同)

Rud*_*udy 14

我实际上用它来对我的POC进行基准测试.您要使用ConsumerOffsetChecker的项目.您可以使用如下的bash脚本运行它.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup
Run Code Online (Sandbox Code Playgroud)

以下是结果: 在此输入图像描述 正如您在红色框中看到的,999是主题中当前消息的数量.

更新:自0.10.0以来不推荐使用ConsumerOffsetChecker,您可能希望开始使用ConsumerGroupCommand.

  • 请注意,ConsumerOffsetChecker 已弃用,并将在 0.9.0 之后的版本中删除。请改用 ConsumerGroupCommand。(kafka.tools.ConsumerOffsetChecker$) (2认同)
  • 是的,我就是这么说的。 (2认同)

Tho*_*aux 11

使用https://prestodb.io/docs/current/connector/kafka-tutorial.html

由Facebook提供的超级SQL引擎,连接多个数据源(Cassandra,Kafka,JMX,Redis ......).

PrestoDB作为带有可选工作服务器的服务器运行(有一个没有额外工作者的独立模式),然后使用一个小的可执行JAR(称为presto CLI)来进行查询.

一旦配置好Presto服务器,就可以使用传统的SQL:

SELECT count(*) FROM TOPIC_NAME;
Run Code Online (Sandbox Code Playgroud)


pdp*_*pdp 7

有时感兴趣的是了解每个分区中的消息数,例如在测试自定义分区程序时。随后的步骤已经过测试,可以与Confluent 3.2中的Kafka 0.10.2.1-2一起使用。给定一个Kafka主题,kt以及以下命令行:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt
Run Code Online (Sandbox Code Playgroud)

打印输出示例输出,显示三个分区中消息的数量:

kt:2:6138
kt:1:6123
kt:0:6137
Run Code Online (Sandbox Code Playgroud)

行数或多或少取决于主题的分区数。

  • 如果启用了日志压缩,那么对分区的偏移量求和可能无法提供主题中消息的确切数量。 (2认同)

Chr*_*ard 6

使用Kafka 2.11-1.0.0的Java客户端,可以做以下事情:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

输出是这样的:

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
Run Code Online (Sandbox Code Playgroud)


Bor*_*kov 6

运行以下命令(假设kafka-console-consumer.sh在路径上):

kafka-console-consumer.sh  --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true  \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Run Code Online (Sandbox Code Playgroud)


rus*_*ngm 6

由于ConsumerOffsetChecker不再受支持,因此您可以使用此命令来检查主题中的所有消息:

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --group my-group \
    --bootstrap-server localhost:9092 \
    --describe
Run Code Online (Sandbox Code Playgroud)

LAG主题分区中的邮件计数在哪里:

在此处输入图片说明

您也可以尝试使用kafkacat。这是一个开源项目,可以帮助您从主题和分区中读取消息并将其打印到stdout。这是一个示例,该示例从sample-kafka-topic主题读取最后10条消息,然后退出:

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
Run Code Online (Sandbox Code Playgroud)

  • 这个答案有点缺乏精确性。LAG 是等待消费者消费的消息量。不是分区中消息的总数。对于分区中的消息总数来说,一个更准确的值(但仍然有些误导)是 LOG-END-OFFSET。 (3认同)

Eri*_*ski 5

Apache Kafka命令在主题的所有分区上获取未处理的消息:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group
Run Code Online (Sandbox Code Playgroud)

印刷品:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none
Run Code Online (Sandbox Code Playgroud)

列6是未处理的消息。像这样添加它们:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'
Run Code Online (Sandbox Code Playgroud)

awk读取行,跳过标题行并累加第6列,最后打印总和。

版画

5
Run Code Online (Sandbox Code Playgroud)


Aut*_*ike 5

要获取为该主题存储的所有消息,您可以寻找消费者到每个分区的流的开头和结尾,并对结果求和

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
Run Code Online (Sandbox Code Playgroud)

  • 顺便说一句,如果您打开了压缩,则流中可能存在间隙,因此实际消息数可能低于此处计算的总数。要获得准确的总数,您将不得不重播消息并对其进行计数。 (2认同)

LeY*_*ble 5

我也有同样的问题,这就是我在 Kotlin 中从 KafkaConsumer 处得到的答案:

val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
    .map {
        it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
    }.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
    .first()
Run Code Online (Sandbox Code Playgroud)

非常粗糙的代码,因为我刚刚让它工作,但基本上你想从结束偏移量中减去主题的开始偏移量,这将是该主题的当前消息计数。

您不能仅仅依赖于结束偏移量,因为其他配置(清理策略、保留毫秒等)可能最终导致从您的主题中删除旧消息。偏移量仅向前“移动”,因此起始偏移量将向前移动更接近结束偏移量(或者最终到达相同的值,如果主题现在不包含消息)。

基本上,结束偏移量代表通过该主题的消息总数,两者之间的差异代表该主题当前包含的消息数量。