Bha*_*kar 7 scala apache-kafka apache-spark spark-streaming
我是新的火花,你能不能让我知道如何在apache spark中使用来自kafka主题的scala来读取json数据.
谢谢.
最简单的方法是使用Spark附带的DataFrame抽象.
val sqlContext = new SQLContext(sc)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Set("myTopicName"))
stream.foreachRDD(
rdd => {
val dataFrame = sqlContext.read.json(rdd.map(_._2)) //converts json to DF
//do your operations on this DF. You won't even require a model class.
})
Run Code Online (Sandbox Code Playgroud)
我将Play框架的库用于Json。您可以将其作为独立模块添加到项目中。用法如下:
import play.api.libs.json._
import org.apache.spark.streaming.kafka.KafkaUtils
case class MyClass(field1: String,
field2: Int)
implicit val myClassFormat = Json.format[MyClass]
val kafkaParams = Map[String, String](...here are your params...)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Set("myTopicName"))
.map(m => Json.parse(m._2).as[MyClass])
Run Code Online (Sandbox Code Playgroud)