相关疑难解决方法(0)

如何在java程序中获取kafka消耗滞后

我写了一个java程序来消耗来自kafka的消息.我想监视消耗滞后,如何通过java获取它?

顺便说一句,我使用:

<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
Run Code Online (Sandbox Code Playgroud)

提前致谢.

java apache-kafka

5
推荐指数
2
解决办法
8568
查看次数

Consumer.endOffsets 在 Kafka 中是如何工作的?

假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出滞后、提交偏移和结束偏移。类似于 Kafka 控制台消费者组脚本的工作方式,但它适用于所有组。

就像是

单个消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)

Consumer consumer;

static {
  consumer = createConsumer();
}

run() { 
  List<String> groupIds = getConsumerGroups();
  for(String groupId: groupIds) {
       List<TopicParition> topicParitions =  getTopicParitions(groupId);
       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
   }
}
Run Code Online (Sandbox Code Playgroud)

多个消费者 - 工作

run() { 
   List<String> groupIds = getConsumerGroups();
   for(String groupId: groupIds) {
        List<TopicParition> topicParitions =  getTopicParitions(groupId);
        Consumer consumer = createConsumer(); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api spring-kafka

5
推荐指数
1
解决办法
2494
查看次数