如何使用toDF()将自定义Java类对象的RDD转换为DataFrame?

sta*_*010 5 scala apache-spark apache-spark-sql

我试图将Spark RDD转换为Spark SQL数据帧toDF().我已成功多次使用此函数,但在这种情况下,我收到编译器错误:

error: value toDF is not a member of org.apache.spark.rdd.RDD[com.example.protobuf.SensorData]
Run Code Online (Sandbox Code Playgroud)

这是我的代码如下:

// SensorData is an auto-generated class
import com.example.protobuf.SensorData
def loadSensorDataToRdd : RDD[SensorData] = ???

object MyApplication {
  def main(argv: Array[String]): Unit = {

    val conf = new SparkConf()
    conf.setAppName("My application")
    conf.set("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec")
    val sc = new SparkContext(conf)

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val sensorDataRdd = loadSensorDataToRdd()
    val sensorDataDf = sensorDataRdd.toDF() // <-- CAUSES COMPILER ERROR
  }
}
Run Code Online (Sandbox Code Playgroud)

我猜测问题出在SensorData类上,这是一个从协议缓冲区自动生成的Java类.为了将RDD转换为数据帧,我该怎么办?

Jac*_*ski 3

编译错误的原因是范围内没有将 a withEncoder转换为 a of 的范围。RDDcom.example.protobuf.SensorDataDatasetcom.example.protobuf.SensorData

EncodersExpressionEncoders准确地说)用于InternalRow根据模式(通常是案例类或Java bean)将对象转换为JVM对象。

希望您可以使用对象的方法创建Encoder自定义 Java 类。org.apache.spark.sql.Encodersbean

为 T 类型的 Java Bean 创建编码器。

像下面这样:

import org.apache.spark.sql.Encoders
implicit val SensorDataEncoder = Encoders.bean(classOf[com.example.protobuf.SensorData])
Run Code Online (Sandbox Code Playgroud)

如果SensorData使用不受支持的类型,您将不得不map使用一些更简单的RDD[SensorData]类型RDD,例如字段的元组,然后才可以期待toDF工作。