计数存储在kafka主题中的消息数

jac*_*hik 23 apache-kafka jms-topic kafka-consumer-api

我正在使用0.9.0.0版本的Kafka,我想在不使用管理脚本kafka-console-consumer.sh的情况下计算主题中的消息数.

我已经尝试了答案Java中的所有命令,如何在apache kafka中获取主题中的消息数量, 但没有人产生结果.有人可以帮我从这里出去吗?

ame*_*tic 56

您可以尝试执行以下命令:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1
Run Code Online (Sandbox Code Playgroud)

然后,总结每个分区的所有计数.

  • 您应该计算最新偏移量和最早偏移量之间的差异之和。(--time -2) 参数给出最早的。 (2认同)

Que*_*eff 6

您可以使用以下内容来总结所有计数:

.../bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list <<broker_1>>:9092,<<broker_2:9092>>... --topic <<your_topic_name>> --time -1 | while IFS=: read topic_name partition_id number; do echo "$number"; done | paste -sd+ - | bc
Run Code Online (Sandbox Code Playgroud)

  • 谢谢!更简单的总结:kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $ KAFKA_CLUSTER_HOSTS --topic $ TOPIC_NAME --time -1 | tr":"""| awk'{sum + = $ 3} END {print sum}' (3认同)
  • @ozma 你也可以使用 `awk -F:` 代替 `tr` :D (2认同)

小智 6

从技术上讲,您可以简单地使用主题中的所有消息并计算它们:

例:

kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic XYZ --partition 0*
Run Code Online (Sandbox Code Playgroud)

但是,kafka.tools.GetOffsetShell方法将为您提供偏移,而不是主题中的实际消息数.这意味着如果主题被压缩,如果您通过消费或通过读取偏移来计算消息,则会得到两个不同的数字.

主题压缩:https://kafka.apache.org/documentation.html#design_compactionbasics

  • 除非时间不是相对的,否则从 Kafka 中的主题中读取潜在的数百万? (2认同)