Spark - 在不打开流的情况下获取 Kafka 的最早和最新偏移量

LST*_*LST 5 scala apache-kafka apache-spark

我目前正在使用spark-streaming-kafka-0-10_2.11kafka 队列连接我的 spark 应用程序。对于 Streams,一切正常。但是对于特定场景,我只需要一次 kafka 队列的全部内容 - 为此我得到了更好地使用的建议KafkaUtils.createRDDSparkStreaming:读取 Kafka Stream 并将其作为 RDD 提供以供进一步处理

但是,因为spark-streaming-kafka-0-10_2.11我无法弄清楚如何为我的 Kafka 主题获取最早和最新的偏移量,这是创建 Offset-Range 所需的,我必须使用该createRDD方法。

在不打开流的情况下获得这些偏移量的推荐方法是什么?任何帮助将不胜感激。

小智 0

在阅读了几次讨论后,我可以使用以下命令获取特定分区的最早或最新偏移量:

val consumer = new SimpleConsumer(host,port,timeout,bufferSize,"offsetfetcher");
val topicAndPartition = new TopicAndPartition(topic, initialPartition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

return offsets.head
Run Code Online (Sandbox Code Playgroud)

但是,我仍然不知道如何通过 KafkaUtils.createRDD 方法在 kafka_consumer.sh CLI 命令中复制“from_beginning”的行为。