Bal*_*eck 7 apache-kafka apache-spark
KafkaUtils.createRDD将offsetRanges作为参数.我不知道我要阅读的主题的偏移量.我想最多阅读主题中的前30条消息.
我看到有一个KafkaCluster.html#getLatestLeaderOffsets,但它被注释为Develop API.
有没有公开的方法来确定某个主题的最早和最新补偿?
这并不是那么简单的事情,因为只有各个代理知道给定主题/分区的最新偏移量信息是什么。
你可以做一个OffsetRequest. 下面将返回主题/分区的最早和最新偏移量(它是 Scala,但如果您不使用 Scala,您应该能够理解)。
请注意,您必须使用SimpleConsumer连接到作为所请求分区的领导者的代理。SimpleConsumer通常我所做的是,为每个经纪人创建一个。然后我执行元数据请求并获取分区到领导者的映射,然后对于每个分区我执行以下操作:
def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
  val time = kafka.api.OffsetRequest.LatestTime
  val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000)))
  val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq")
  val resp = consumer.getOffsetsBefore(req)
  val offsets = resp.offsets(topic, partition)
  if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0))
  else (0, -1)
}
Run Code Online (Sandbox Code Playgroud)
希望这可以帮助。
|   归档时间:  |  
           
  |  
        
|   查看次数:  |  
           1057 次  |  
        
|   最近记录:  |