sta*_*010 5 scala apache-spark apache-spark-sql
我试图将Spark RDD转换为Spark SQL数据帧toDF().我已成功多次使用此函数,但在这种情况下,我收到编译器错误:
error: value toDF is not a member of org.apache.spark.rdd.RDD[com.example.protobuf.SensorData]
Run Code Online (Sandbox Code Playgroud)
这是我的代码如下:
// SensorData is an auto-generated class
import com.example.protobuf.SensorData
def loadSensorDataToRdd : RDD[SensorData] = ???
object MyApplication {
def main(argv: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("My application")
conf.set("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val sensorDataRdd = loadSensorDataToRdd()
val sensorDataDf = sensorDataRdd.toDF() // <-- CAUSES COMPILER ERROR
}
}
Run Code Online (Sandbox Code Playgroud)
我猜测问题出在SensorData类上,这是一个从协议缓冲区自动生成的Java类.为了将RDD转换为数据帧,我该怎么办?
编译错误的原因是范围内没有将 a withEncoder转换为 a of 的范围。RDDcom.example.protobuf.SensorDataDatasetcom.example.protobuf.SensorData
Encoders(ExpressionEncoders准确地说)用于InternalRow根据模式(通常是案例类或Java bean)将对象转换为JVM对象。
希望您可以使用对象的方法创建Encoder自定义 Java 类。org.apache.spark.sql.Encodersbean
为 T 类型的 Java Bean 创建编码器。
像下面这样:
import org.apache.spark.sql.Encoders
implicit val SensorDataEncoder = Encoders.bean(classOf[com.example.protobuf.SensorData])
Run Code Online (Sandbox Code Playgroud)
如果SensorData使用不受支持的类型,您将不得不map使用一些更简单的RDD[SensorData]类型RDD,例如字段的元组,然后才可以期待toDF工作。
| 归档时间: |
|
| 查看次数: |
1869 次 |
| 最近记录: |