Ben*_*zzo 6 apache-kafka apache-spark spark-streaming kafka-consumer-api
我正在尝试使用Spark Direct Stream获取并存储Kafka中特定消息的偏移量.查看Spark文档很容易获得每个分区的范围偏移量,但我需要的是在完全扫描队列后存储主题的每条消息的起始偏移量.
是的,您可以使用 允许您访问的MessageAndMetadata版本.createDirectStreammessage metadata
你可以在这里找到返回Dstream的例子tuple3.
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)
val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic ,mmd.offset, mmd.message().toString)
})
Run Code Online (Sandbox Code Playgroud)
在上面的例子tuple3._1将有topic,tuple3._2将有offset和tuple3._3将有message.
希望这可以帮助!
| 归档时间: |
|
| 查看次数: |
878 次 |
| 最近记录: |