dbu*_*ush 3 java apache-kafka apache-zookeeper
有一个应用程序(不是我的)从Kafka读取消息,对它们进行一些处理,并将记录存储在数据库中.我已经用Java编写了一个程序,它以给定的速率将消息写入队列.现在,它通过在测试运行结束时查询数据库来确定性能的简单度量,以确保记录在=记录中.但是,我想扩展它以定期检查队列,以查看应用程序尚未处理的待处理消息数量,以查看它是否已备份.
我想我可以在Zookeeper中检查应用程序组ID的偏移量.我查看了Kafka文档,但它只提供了基本的消费者示例,并且API文档充其量只是稀疏,因此我不确定如何查找此信息.
我需要调用哪些API才能找出应用程序当前所在队列中的位置,以及该位置后面队列中有多少消息?
我正在使用Kafka 2.10-0.8.2.1和一个Zookeeper实例以及三个Kafka实例,负载测试器正在使用0.8.2.1 Java API.有问题的主题有三个分区(每个Kafka服务器上有一个分区),但是为了测试的目的,只有一个消费者.
我建议查看Kafka中已经提供的工具(如果需要直接调用API,代码在src中可用).特别是,
$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group consumer-group1 --zkconnect zkhost:zkport --topic topic1
Run Code Online (Sandbox Code Playgroud)
会告诉你偏移和滞后:
consumer-group1,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
Owner = consumer-group1-consumer1
Consumer offset = 70121994703
= 70,121,994,703 (65.31G)
Log size = 70122018287
= 70,122,018,287 (65.31G)
Consumer lag = 23584
= 23,584 (0.00G)
Run Code Online (Sandbox Code Playgroud)
参考文献: