在 Spark 中创建数据帧时出错

mat*_*sio 5 scala apache-spark spark-streaming apache-spark-sql

我正在尝试在 kafka-spark 流中创建一个数据帧,我已成功将值映射到案例类,但是每当我调用 toDF 方法时,它都会给我错误。**

value toDF 不是 Array[WeatherEvent] [错误] 的成员 可能的原因:可能在“value toDF”之前缺少分号?[错误]
}).toDF("经度", "纬度", "国家", "日出", "日落", "温度", "最低温度", "最高温度", [错误] ^ [错误] 发现一处错误[error] (compile:compileIncremental) 编译失败 [error] 总时间:2 s,完成 2017 年 9 月 27 日 11:49:23 AM

这是我的代码

 val inputStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String ](Array("test"), kafkaParams))
 //  val json = parse(inputStream)


  val processedStream = inputStream
  .flatMap(record => record.value.split(" ").map(payload => {
        //val ts = Timestamp.valueOf(payload(3))
        WeatherEvent(payload(0).toDouble, payload(1).toDouble, payload(2).toString , payload(3).toInt,
                    payload(4).toInt, payload(5).toDouble, payload(6).toDouble, payload(7).toDouble, 
                    payload(8).toDouble, payload(9).toInt, payload(10).toInt, payload(11).toInt, 
                    payload(12).toDouble, payload(13).toDouble)
      }).toDF("longitude", "latitude", "country", "sunrise", "sunset", "temperature", "temperatureMin", "temperatureMax", 
              "pressure", "humidity", "cloudiness", "id", "wind_speed", "wind_deg")
 )
Run Code Online (Sandbox Code Playgroud)

谢谢 **

Ish*_*mar 2

toDF() 是 sqlContext 中定义的隐式方法。toDF() 用于将 RDD 转换为 Dataframe。这里你从 Kafka 获得一个流,我指的是 Dstreams。要将其转换为 DF,您需要使用Transform API 或foreachRDD API 处理 Dstreams 中的每个 RDD。下面我使用 foreachRDD 转换将 RDD 转换为 Dataframe

val data=KafkaUtils.createStream(ssc, zkQuorum, "GroupName", topics).map(x=>x._2)
val lines12=data.foreachRDD(x=>{
  val df=x.flatMap(x => x.split(",")).map(x=>(x(0),x(1))).toDF()
}
Run Code Online (Sandbox Code Playgroud)