如何获得kafka主题的最新偏移量?

Nep*_*une 26 java apache-kafka kafka-consumer-api

我正在kafka使用Java 编写消费者.我想保留消息的实时,所以如果等待消费的消息太多,例如1000或更多,我应该放弃未消耗的消息并开始使用最新的消息.

对于这个问题,我尝试比较最后一个提交的偏移量和一个主题的最新偏移量(只有一个分区),如果这两个偏移量之间的差异大于一定量,我将把主题的最新偏移量设置为下一个偏移,以便我可以放弃那些冗余的消息.

现在我的问题是如何获得一个主题的最新偏移,有人说我可以使用旧的消费者,但它太复杂,新的消费者有这个功能吗?

小智 24

新消费者也很复杂.

//assign the topic consumer.assign();

//seek to end of the topic consumer.seekToEnd();

//the position is the latest offset consumer.position();

  • 如果您只想计算客户端当前偏移量和最新已知的 kafka 主题偏移量之间的差异,这将不起作用! (2认同)

hia*_*ibe 11

对于Kafka版本:0.10.1.1

// Get the diff of current position and latest offset
Set<TopicPartition> partitions = new HashSet<TopicPartition>();
TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition());
partitions.add(actualTopicPartition);
Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition);
long actualPosition = consumer.position(actualTopicPartition);          
System.out.println(String.format("diff: %s   (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition));  
Run Code Online (Sandbox Code Playgroud)


Ste*_*ven 11

您还可以使用kafka服务器命令行工具:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic-name


小智 5

我开发了以下代码来获取偏移状态

import java.util
import java.util.{Collections, Properties}

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._

class GetOffsetRange(consumer:KafkaConsumer[String,String]) {

  def getStartOffsetRange(topic:String):util.HashMap[TopicPartition,Long]={

    val topicPartitionList = consumer.partitionsFor(topic)
    val partitionMap=new util.HashMap[TopicPartition,Long]()
    val arrTopic=new util.ArrayList[TopicPartition]()

    consumer.subscribe(Collections.singletonList(topic));

    for(topic<-topicPartitionList.asScala){
      println(topic.topic() +","+topic.partition())
      arrTopic.add(new TopicPartition(topic.topic(),topic.partition()))
    }

    consumer.poll(0)

    consumer.seekToBeginning(arrTopic)

    for(partition <- arrTopic.asScala){
      partitionMap.put(partition,consumer.position(partition)-1)
    }
    return partitionMap
  }

  def getEndOffsetRange(topic:String):util.HashMap[TopicPartition,Long]={

    val topicPartitionList = consumer.partitionsFor(topic)
    val partitionMap=new util.HashMap[TopicPartition,Long]()
    val arrTopic=new util.ArrayList[TopicPartition]()

    consumer.subscribe(Collections.singletonList(topic));

    for(topic<-topicPartitionList.asScala){
      println(topic.topic() +","+topic.partition())
      arrTopic.add(new TopicPartition(topic.topic(),topic.partition()))
    }

    consumer.poll(0)

    consumer.seekToEnd(arrTopic)

    for(partition <- arrTopic.asScala){
      partitionMap.put(partition,consumer.position(partition)-1)
    }
    return partitionMap
  }
}
Run Code Online (Sandbox Code Playgroud)