men*_*h84 228 apache-spark rdd apache-spark-sql apache-spark-dataset
我只是想知道Apache Spark中的RDD和DataFrame (Spark 2.0.0 DataFrame只是一个类型别名Dataset[Row])有什么区别?
你能把一个转换成另一个吗?
Jus*_*ony 213
通过DataFrame谷歌搜索"DataFrame定义"来定义A:
数据框是一个表或二维数组结构,其中每列包含一个变量的测量值,每行包含一个案例.
因此,DataFrame由于其表格格式,a 具有额外的元数据,这允许Spark在最终查询上运行某些优化.
的RDD,另一方面,仅仅是- [R esilient d istributed d ataset是比较数据的黑盒不能作为可以针对它要执行的操作进行优化的,并不像约束.
但是,你可以从一个数据帧到一个RDD通过它的rdd方法,你可以从一个去RDD到DataFrame(如果RDD是表格形式),通过该toDF方法
通常,DataFrame由于内置的查询优化,建议尽可能使用.
Ram*_*ram 198
首先是
DataFrame从进化而来SchemaRDD.
是的..之间的转换Dataframe,并RDD是绝对有可能的.
以下是一些示例代码段.
df.rdd 是 RDD[Row]以下是一些创建数据框的选项.
1)yourrddOffrow.toDF转换为DataFrame.
2)使用createDataFramesql上下文
val df = spark.createDataFrame(rddOfRow, schema)
架构可以来自下面的一些选项,如很好的SO帖子所描述的..
来自scala案例类和scala反射apiRun Code Online (Sandbox Code Playgroud)import org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]或使用
EncodersRun Code Online (Sandbox Code Playgroud)import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema如Schema所描述的也可以使用
StructType和 创建StructFieldRun Code Online (Sandbox Code Playgroud)val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("col1", DoubleType, true)) .add(StructField("col2", DoubleType, true)) etc...
RDD API:的
RDD,因为1.0版本(弹性分布式数据集)API已在火花.所述
RDDAPI提供了许多转化方法,例如map(),filter(),和reduce(),用于对数据执行计算.这些方法中的每一个都产生RDD表示变换数据的新方法.但是,这些方法只是定义要执行的操作,并且在调用操作方法之前不会执行转换.动作方法的示例是collect()和saveAsObjectFile().
RDD示例:
rdd.filter(_.age > 21) // transformation
.map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action
Run Code Online (Sandbox Code Playgroud)
示例:使用RDD按属性过滤
rdd.filter(_.age > 21)
Run Code Online (Sandbox Code Playgroud)
DataFrame APISpark 1.3引入了一个新的
DataFrameAPI作为Project Tungsten计划的一部分,该计划旨在提高Spark的性能和可扩展性.该DataFrameAPI引入了一个模式来描述数据的概念,使星火管理模式和唯一的节点之间传递数据,比使用Java序列化一个更有效的方式.该
DataFrameAPI是从根本上不同的RDDAPI,因为它是建立一个关系查询计划,星火的催化剂优化就可以执行的API.对于熟悉构建查询计划的开发人员而言,API很自然
示例SQL样式:
df.filter("age > 21");
限制: 因为代码是按名称引用数据属性,所以编译器无法捕获任何错误.如果属性名称不正确,则只有在创建查询计划时才会在运行时检测到错误.
DataFrameAPI的另一个缺点是它非常以scala为中心,虽然它支持Java,但支持有限.
例如,在DataFrame从现有RDDJava对象创建时,Spark的Catalyst优化器无法推断架构并假定DataFrame中的任何对象都实现了该scala.Product接口.Scala case class解决了这个问题,因为他们实现了这个界面.
Dataset API该
DatasetAPI作为Spark 1.6中的API预览发布,旨在提供两全其美的功能; 熟悉的面向对象编程风格和RDDAPI的编译时类型安全性,但具有Catalyst查询优化器的性能优势.数据集也使用与DataFrameAPI 相同的高效堆外存储机制 .在序列化数据时,
DatasetAPI具有编码器的概念, 可在JVM表示(对象)和Spark的内部二进制格式之间进行转换.Spark具有非常先进的内置编码器,它们生成字节代码以与堆外数据交互,并提供对各个属性的按需访问,而无需对整个对象进行反序列化.Spark尚未提供用于实现自定义编码器的API,但计划在将来的版本中使用.此外,
DatasetAPI旨在与Java和Scala同样良好地工作.使用Java对象时,重要的是它们完全符合bean.
示例DatasetAPI SQL样式:
dataset.filter(_.age < 21);
Run Code Online (Sandbox Code Playgroud)
进一步阅读... databricks 文章
Ami*_*bey 126
Apache Spark提供三种类型的API
以下是RDD,Dataframe和Dataset之间的API比较.
Spark提供的主要抽象是弹性分布式数据集(RDD),它是跨群集节点分区的元素的集合,可以并行操作.
分布式集合:
RDD使用MapReduce操作,广泛用于在集群上使用并行分布式算法处理和生成大型数据集.它允许用户使用一组高级操作符编写并行计算,而不必担心工作分配和容错.
不可变: RDD由分区的记录集合组成.分区是RDD中并行性的基本单元,每个分区是数据的一个逻辑分区,它是不可变的,并通过现有分区上的一些转换创建.可变性有助于实现计算的一致性.
容错: 在我们丢失一些RDD分区的情况下,我们可以在lineage中重放该分区上的转换来实现相同的计算,而不是跨多个节点进行数据复制.这个特性是RDD的最大好处,因为它节省了在数据管理和复制方面做了很多努力,从而实现了更快的计算.
延迟评估: Spark中的所有转换都是惰性的,因为它们不会立即计算结果.相反,他们只记得应用于某些基础数据集的转换.仅当操作需要将结果返回到驱动程序时才会计算转换.
功能转换: RDD支持两种类型的操作:转换(从现有数据集创建新数据集)和操作(在数据集上运行计算后将值返回到驱动程序).
数据处理格式:
它可以轻松高效地处理结构化数据和非结构化数据.
支持的编程语言:
RDD API可用于Java,Scala,Python和R.
没有内置的优化引擎: 当使用结构化数据时,RDD无法利用Spark的高级优化器,包括催化剂优化器和Tungsten执行引擎.开发人员需要根据其属性优化每个RDD.
处理结构化数据: 与Dataframe和数据集不同,RDD不会推断摄取数据的模式,并且需要用户指定它.
Spark在Spark 1.3版本中引入了Dataframes.Dataframe克服了RDD所面临的主要挑战.
DataFrame是组织到命名列中的分布式数据集合.它在概念上等同于关系数据库或R/Python Dataframe中的表.与Dataframe一起,Spark还引入了催化剂优化器,它利用高级编程功能构建可扩展的查询优化器.
行对象的分布式集合: DataFrame是组织成命名列的分布式数据集合.它在概念上等同于关系数据库中的表,但在引擎盖下具有更丰富的优化.
数据处理: 处理结构化和非结构化数据格式(Avro,CSV,弹性搜索和Cassandra)和存储系统(HDFS,HIVE表,MySQL等).它可以从所有这些不同的数据源读取和写入.
使用催化剂优化器进行优化: 它为SQL查询和DataFrame API提供支持.Dataframe分四个阶段使用催化剂树转换框架,
1.Analyzing a logical plan to resolve references
2.Logical plan optimization
3.Physical planning
4.Code generation to compile parts of the query to Java bytecode.
Run Code Online (Sandbox Code Playgroud)Hive兼容性: 使用Spark SQL,您可以在现有的Hive仓库上运行未修改的Hive查询.它重用了Hive前端和MetaStore,使您可以完全兼容现有的Hive数据,查询和UDF.
Tungsten: Tungsten提供物理执行后端,用于显式管理内存并动态生成字节码以进行表达式评估.
支持的编程语言:
Dataframe API可用于Java,Scala,Python和R.
例:
case class Person(name : String , age : Int)
val dataframe = sqlContext.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name
Run Code Online (Sandbox Code Playgroud)
当您使用多个转换和聚合步骤时,这尤其具有挑战性.
例:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
Run Code Online (Sandbox Code Playgroud)
Dataset API是DataFrames的扩展,它提供了一种类型安全的,面向对象的编程接口.它是一个强类型,不可变的对象集合,映射到关系模式.
在数据集的核心,API是一个称为编码器的新概念,它负责在JVM对象和表格表示之间进行转换.表格表示使用Spark内部Tungsten二进制格式存储,允许对序列化数据进行操作并提高内存利用率.Spark 1.6支持自动生成各种类型的编码器,包括基本类型(例如String,Integer,Long),Scala案例类和Java Bean.
提供RDD和Dataframe的最佳 功能: RDD(函数式编程,类型安全),DataFrame(关系模型,查询优化,钨执行,排序和混洗)
编码器: 通过使用编码器,可以轻松地将任何JVM对象转换为数据集,从而允许用户使用结构化和非结构化数据,而不像Dataframe.
支持的编程语言: Datasets API目前仅在Scala和Java中可用.1.6版目前不支持Python和R. Python支持定于2.0版.
类型安全: 数据集API提供编译时安全性,这在Dataframe中是不可用的.在下面的示例中,我们可以看到Dataset如何使用编译lambda函数对域对象进行操作.
例:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
Run Code Online (Sandbox Code Playgroud)
例:
ds.select(col("name").as[String], $"age".as[Int]).collect()
Run Code Online (Sandbox Code Playgroud)
不支持Python和R:从1.6版开始,Datasets仅支持Scala和Java.Python支持将引入Python 2.0.
与现有的RDD和Dataframe API相比,Datasets API带来了一些优势,具有更好的类型安全性和函数式编程.由于API中的类型转换要求的挑战,您仍然不会需要类型安全性并且会使代码变得脆弱.
mrs*_*vas 40
RDD是一个容错的容错集合,可以并行操作.
RDD是一个组织成命名列的数据集.它在概念上等同于关系数据库中的表或R/Python中的数据框,但在底层具有更丰富的优化.
DataFrame是一个分布式数据集合.数据集是Spark 1.6中添加的一个新接口,它提供了RDD的优势 (强类型,使用强大的lambda函数的能力)和Spark SQL优化执行引擎的 优点.
注意:
DataFrameScala/Java 中Rows()的数据集通常称为DataFrame.

问:你可以将一个转换为另一个,如RDD到DataFrame,反之亦然?
1. Dataset要Dataset与Dataset[Row]
val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)
val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2")
df.show()
+------+----+----+
| id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
Run Code Online (Sandbox Code Playgroud)
更多方法:在Spark中将RDD对象转换为Dataframe
2. Nice comparison of all of them with a code snippet./ RDD到DataFrame与.toDF()方法
val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD
Run Code Online (Sandbox Code Playgroud)
Ven*_*ive 25
简单来说RDD就是核心组件,但它DataFrame是Spark 1.30中引入的API.
调用数据分区的集合RDD.这些RDD必须遵循以下几个属性:
这RDD是结构化的或非结构化的.
DataFrame是Scala,Java,Python和R中可用的API.它允许处理任何类型的结构化和半结构化数据.要定义DataFrame,组织成命名列的分布式数据集合称为DataFrame.您可以轻松地优化RDDs中DataFrame.您可以使用一次处理JSON数据,镶木地板数据,HiveQL数据DataFrame.
val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")
val sample_DF = sampleRDD.toDF()
Run Code Online (Sandbox Code Playgroud)
这里Sample_DF考虑为DataFrame.sampleRDD是(原始数据)调用RDD.
Yuv*_*kov 24
因为DataFrame弱类型而开发人员没有获得类型系统的好处.例如,假设您想从SQL读取内容并在其上运行一些聚合:
val people = sqlContext.read.parquet("...")
val department = sqlContext.read.parquet("...")
people.filter("age > 30")
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), "gender")
.agg(avg(people("salary")), max(people("age")))
Run Code Online (Sandbox Code Playgroud)
当你说people("deptId"),你没有回来Int,或者Long你正在找回Column你需要操作的物体.在具有诸如Scala之类的丰富类型系统的语言中,最终会失去所有类型安全性,从而增加了在编译时可以发现的事物的运行时错误的数量.
相反,DataSet[T]是打字的.当你这样做时:
val people: People = val people = sqlContext.read.parquet("...").as[People]
Run Code Online (Sandbox Code Playgroud)
你实际上是在找一个People对象,它deptId是一个实际的整数类型而不是列类型,因此利用了类型系统.
从Spark 2.0开始,DataFrame和DataSet API将统一起来,其中DataFrame将是一个类型别名DataSet[Row].
vaq*_*han 10
大多数答案都是正确的,只想在这里添加一点
在Spark 2.0中,两个API(DataFrame + DataSet)将统一到一个API中.
"统一DataFrame和数据集:在Scala和Java中,DataFrame和Dataset已经统一,即DataFrame只是Row数据集的类型别名.在Python和R中,由于缺乏类型安全性,DataFrame是主要的编程接口."
数据集与RDD类似,但是,它们不使用Java序列化或Kryo,而是使用专用的编码器来序列化对象以便通过网络进行处理或传输.
Spark SQL支持两种不同的方法将现有RDD转换为数据集.第一种方法使用反射来推断包含特定类型对象的RDD的模式.这种基于反射的方法可以使代码更加简洁,并且在编写Spark应用程序时已经了解了模式.
创建数据集的第二种方法是通过编程接口,允许您构建模式,然后将其应用于现有RDD.虽然此方法更详细,但它允许您在直到运行时才知道列及其类型时构造数据集.
在这里,您可以找到RDD tof数据框对话答案
DataFrame等同于RDBMS中的表,也可以通过类似于RDD中"本机"分布式集合的方式进行操作.与RDD不同,Dataframes跟踪架构并支持各种关系操作,从而实现更优化的执行.每个DataFrame对象代表一个逻辑计划,但由于它们的"惰性"特性,在用户调用特定的"输出操作"之前不会执行任何操作.
Dataframe是Row对象的RDD,每个对象代表一条记录.Dataframe还知道其行的模式(即数据字段).虽然Dataframes看起来像常规RDD,但在内部它们以更有效的方式存储数据,利用其架构.此外,它们还提供RDD上不可用的新操作,例如运行SQL查询的功能.可以从外部数据源,查询结果或常规RDD创建数据帧.
参考文献:Zaharia M.,et al.学习星火(O'Reilly,2015)
希望对您有所帮助!