我只是想知道Apache Spark中的RDD和DataFrame (Spark 2.0.0 DataFrame只是一个类型别名Dataset[Row])有什么区别?
你能把一个转换成另一个吗?
有没有人可以通过示例帮助我理解DataSet API和DataFrame API之间的区别?为什么需要在Spark中引入DataSet API?
以下示例代码尝试将一些案例对象放入数据框中.代码包括案例对象层次结构的定义和使用此特征的案例类:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
sealed trait Some
case object AType extends Some
case object BType extends Some
case class Data( name : String, t: Some)
object Example {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf()
.setAppName( "Example" )
.setMaster( "local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
df.show()
}
}
Run Code Online (Sandbox Code Playgroud)
执行代码时,我遗憾地遇到以下异常:
java.lang.UnsupportedOperationException: Schema for type …Run Code Online (Sandbox Code Playgroud) 我试图将Spark RDD转换为Spark SQL数据帧toDF().我已成功多次使用此函数,但在这种情况下,我收到编译器错误:
error: value toDF is not a member of org.apache.spark.rdd.RDD[com.example.protobuf.SensorData]
Run Code Online (Sandbox Code Playgroud)
这是我的代码如下:
// SensorData is an auto-generated class
import com.example.protobuf.SensorData
def loadSensorDataToRdd : RDD[SensorData] = ???
object MyApplication {
def main(argv: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("My application")
conf.set("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val sensorDataRdd = loadSensorDataToRdd()
val sensorDataDf = sensorDataRdd.toDF() // <-- CAUSES COMPILER ERROR
}
}
Run Code Online (Sandbox Code Playgroud)
我猜测问题出在SensorData类上,这是一个从协议缓冲区自动生成的Java类.为了将RDD转换为数据帧,我该怎么办?
在阅读了几篇关于Spark的DataSet的精彩文章(本,本和本文)后,我完成了下一个DataSet相对于RDD的性能优势:
问题:
RDD[Person].DataSet是否具有高级典型化?IN_MEMORY_ONLY持久性策略时,情况怎么样?无论如何,DataSet会将所有内容序列化吗?它会比RDD有任何性能优势吗?