LST*_*LST 5 scala apache-kafka apache-spark
我目前正在使用spark-streaming-kafka-0-10_2.11kafka 队列连接我的 spark 应用程序。对于 Streams,一切正常。但是对于特定场景,我只需要一次 kafka 队列的全部内容 - 为此我得到了更好地使用的建议KafkaUtils.createRDD(SparkStreaming:读取 Kafka Stream 并将其作为 RDD 提供以供进一步处理)
但是,因为spark-streaming-kafka-0-10_2.11我无法弄清楚如何为我的 Kafka 主题获取最早和最新的偏移量,这是创建 Offset-Range 所需的,我必须使用该createRDD方法。
在不打开流的情况下获得这些偏移量的推荐方法是什么?任何帮助将不胜感激。
小智 0
在阅读了几次讨论后,我可以使用以下命令获取特定分区的最早或最新偏移量:
val consumer = new SimpleConsumer(host,port,timeout,bufferSize,"offsetfetcher");
val topicAndPartition = new TopicAndPartition(topic, initialPartition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
return offsets.head
Run Code Online (Sandbox Code Playgroud)
但是,我仍然不知道如何通过 KafkaUtils.createRDD 方法在 kafka_consumer.sh CLI 命令中复制“from_beginning”的行为。
| 归档时间: |
|
| 查看次数: |
2026 次 |
| 最近记录: |