Spark中的DataFrame,Dataset和RDD之间的区别

men*_*h84 228 apache-spark rdd apache-spark-sql apache-spark-dataset

我只是想知道Apache Spark中的RDDDataFrame (Spark 2.0.0 DataFrame只是一个类型别名Dataset[Row])有什么区别?

你能把一个转换成另一个吗?

Jus*_*ony 213

通过DataFrame谷歌搜索"DataFrame定义"来定义A:

数据框是一个表或二维数组结构,其中每列包含一个变量的测量值,每行包含一个案例.

因此,DataFrame由于其表格格式,a 具有额外的元数据,这允许Spark在最终查询上运行某些优化.

RDD,另一方面,仅仅是- [R esilient d istributed d ataset是比较数据的黑盒不能作为可以针对它要执行的操作进行优化的,并不像约束.

但是,你可以从一个数据帧到一个RDD通过它的rdd方法,你可以从一个去RDDDataFrame(如果RDD是表格形式),通过该toDF方法

通常,DataFrame由于内置的​​查询优化,建议尽可能使用.

  • 答案没有提供有关数据集的解释。根据 Spark- The Definitive Guide,数据集是一种类型安全的结构化 API。因此您可以预先提供模式的类型。 (11认同)
  • 谢谢-在原始问题(不包括数据集)之后,对原始标题和描述进行了很好的编辑 (2认同)

Ram*_*ram 198

首先是DataFrame从进化而来SchemaRDD.

将处理方法改为SchemaRDD

是的..之间的转换Dataframe,并RDD是绝对有可能的.

以下是一些示例代码段.

  • df.rddRDD[Row]

以下是一些创建数据框的选项.

  • 1)yourrddOffrow.toDF转换为DataFrame.

  • 2)使用createDataFramesql上下文

    val df = spark.createDataFrame(rddOfRow, schema)

架构可以来自下面的一些选项,如很好的SO帖子所描述的..
来自scala案例类和scala反射api

import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]
Run Code Online (Sandbox Code Playgroud)

或使用 Encoders

import org.apache.spark.sql.Encoders
val mySchema = Encoders.product[MyCaseClass].schema
Run Code Online (Sandbox Code Playgroud)

如Schema所描述的也可以使用StructType和 创建StructField

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("col1", DoubleType, true))
  .add(StructField("col2", DoubleType, true)) etc...
Run Code Online (Sandbox Code Playgroud)

图片描述

事实上,现在有3个Apache Spark API ..

在此输入图像描述

  1. RDD API:

RDD,因为1.0版本(弹性分布式数据集)API已在火花.

所述RDDAPI提供了许多转化方法,例如map(), filter(),和reduce(),用于对数据执行计算.这些方法中的每一个都产生RDD表示变换数据的新方法.但是,这些方法只是定义要执行的操作,并且在调用操作方法之前不会执行转换.动作方法的示例是collect()和 saveAsObjectFile().

RDD示例:

rdd.filter(_.age > 21) // transformation
   .map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action
Run Code Online (Sandbox Code Playgroud)

示例:使用RDD按属性过滤

rdd.filter(_.age > 21)
Run Code Online (Sandbox Code Playgroud)
  1. DataFrame API

Spark 1.3引入了一个新的DataFrameAPI作为Project Tungsten计划的一部分,该计划旨在提高Spark的性能和可扩展性.该DataFrameAPI引入了一个模式来描述数据的概念,使星火管理模式和唯一的节点之间传递数据,比使用Java序列化一个更有效的方式.

DataFrameAPI是从根本上不同的RDDAPI,因为它是建立一个关系查询计划,星火的催化剂优化就可以执行的API.对于熟悉构建查询计划的开发人员而言,API很自然

示例SQL样式:

df.filter("age > 21");

限制: 因为代码是按名称引用数据属性,所以编译器无法捕获任何错误.如果属性名称不正确,则只有在创建查询计划时才会在运行时检测到错误.

DataFrameAPI的另一个缺点是它非常以scala为中心,虽然它支持Java,但支持有限.

例如,在DataFrame从现有RDDJava对象创建时,Spark的Catalyst优化器无法推断架构并假定DataFrame中的任何对象都实现了该scala.Product接口.Scala case class解决了这个问题,因为他们实现了这个界面.

  1. Dataset API

DatasetAPI作为Spark 1.6中的API预览发布,旨在提供两全其美的功能; 熟悉的面向对象编程风格和RDDAPI的编译时类型安全性,但具有Catalyst查询优化器的性能优势.数据集也使用与DataFrameAPI 相同的高效堆外存储机制 .

在序列化数据时,DatasetAPI具有编码器的概念, 可在JVM表示(对象)和Spark的内部二进制格式之间进行转换.Spark具有非常先进的内置编码器,它们生成字节代码以与堆外数据交互,并提供对各个属性的按需访问,而无需对整个对象进行反序列化.Spark尚未提供用于实现自定义编码器的API,但计划在将来的版本中使用.

此外,DatasetAPI旨在与Java和Scala同样良好地工作.使用Java对象时,重要的是它们完全符合bean.

示例DatasetAPI SQL样式:

dataset.filter(_.age < 21);
Run Code Online (Sandbox Code Playgroud)

评价差异.之间DataFrame&DataSet: 在此输入图像描述

进一步阅读... databricks 文章

  • @ neelesh-srinivasan:以数据帧语法df.filter(“ age&gt; 21”);为例,只能在运行时对其进行评估/分析。自其字符串。如果是数据集,则数据集符合Bean。所以年龄是豆子的财产。如果您的bean中没有age属性,那么您将在ie编译时就早早知道(即dataset.filter(_。age &lt;21);`)。分析错误可以重命名为评估错误。 (5认同)
  • 第一张图片有误导性。Python 中不提供数据集 (3认同)

Ami*_*bey 126

Apache Spark提供三种类型的API

  1. RDD
  2. 数据帧
  3. 数据集

比较RDD,Dataframe和Dataset API

以下是RDD,Dataframe和Dataset之间的API比较.

RDD

Spark提供的主要抽象是弹性分布式数据集(RDD),它是跨群集节点分区的元素的集合,可以并行操作.

RDD特点: -

  • 分布式集合:
    RDD使用MapReduce操作,广泛用于在集群上使用并行分布式算法处理和生成大型数据集.它允许用户使用一组高级操作符编写并行计算,而不必担心工作分配和容错.

  • 不可变: RDD由分区的记录集合组成.分区是RDD中并行性的基本单元,每个分区是数据的一个逻辑分区,它是不可变的,并通过现有分区上的一些转换创建.可变性有助于实现计算的一致性.

  • 容错: 在我们丢失一些RDD分区的情况下,我们可以在lineage中重放该分区上的转换来实现相同的计算,而不是跨多个节点进行数据复制.这个特性是RDD的最大好处,因为它节省了在数据管理和复制方面做了很多努力,从而实现了更快的计算.

  • 延迟评估: Spark中的所有转换都是惰性的,因为它们不会立即计算结果.相反,他们只记得应用于某些基础数据集的转换.仅当操作需要将结果返回到驱动程序时才会计算转换.

  • 功能转换: RDD支持两种类型的操作:转换(从现有数据集创建新数据集)和操作(在数据集上运行计算后将值返回到驱动程序).

  • 数据处理格式:
    它可以轻松高效地处理结构化数据和非结构化数据.

  • 支持的编程语言:
    RDD API可用于Java,Scala,Python和R.

RDD限制: -

  • 没有内置的优化引擎: 当使用结构化数据时,RDD无法利用Spark的高级优化器,包括催化剂优化器和Tungsten执行引擎.开发人员需要根据其属性优化每个RDD.

  • 处理结构化数据: 与Dataframe和数据集不同,RDD不会推断摄取数据的模式,并且需要用户指定它.

Dataframes

Spark在Spark 1.3版本中引入了Dataframes.Dataframe克服了RDD所面临的主要挑战.

DataFrame是组织到命名列中的分布式数据集合.它在概念上等同于关系数据库或R/Python Dataframe中的表.与Dataframe一起,Spark还引入了催化剂优化器,它利用高级编程功能构建可扩展的查询优化器.

数据帧功能: -

  • 行对象的分布式集合: DataFrame是组织成命名列的分布式数据集合.它在概念上等同于关系数据库中的表,但在引擎盖下具有更丰富的优化.

  • 数据处理: 处理结构化和非结构化数据格式(Avro,CSV,弹性搜索和Cassandra)和存储系统(HDFS,HIVE表,MySQL等).它可以从所有这些不同的数据源读取和写入.

  • 使用催化剂优化器进行优化: 它为SQL查询和DataFrame API提供支持.Dataframe分四个阶段使用催化剂树转换框架,

    1.Analyzing a logical plan to resolve references
    2.Logical plan optimization
    3.Physical planning
    4.Code generation to compile parts of the query to Java bytecode.
    
    Run Code Online (Sandbox Code Playgroud)
  • Hive兼容性: 使用Spark SQL,您可以在现有的Hive仓库上运行未修改的Hive查询.它重用了Hive前端和MetaStore,使您可以完全兼容现有的Hive数据,查询和UDF.

  • Tungsten: Tungsten提供物理执行后端,用于显式管理内存并动态生成字节码以进行表达式评估.

  • 支持的编程语言:
    Dataframe API可用于Java,Scala,Python和R.

数据帧限制: -

  • 编译时类型安全: 正如所讨论的,Dataframe API不支持编译时安全性,这限制了您在不知道结构时操纵数据.以下示例在编译期间有效.但是,执行此代码时将出现运行时异常.

例:

case class Person(name : String , age : Int) 
val dataframe = sqlContext.read.json("people.json") 
dataframe.filter("salary > 10000").show 
=> throws Exception : cannot resolve 'salary' given input age , name
Run Code Online (Sandbox Code Playgroud)

当您使用多个转换和聚合步骤时,这尤其具有挑战性.

  • 无法对域对象(丢失域对象)进行操作: 将域对象转换为数据帧后,无法从中重新生成它.在下面的示例中,一旦我们从personRDD创建personDF,我们将不会恢复Person类的原始RDD(RDD [Person]).

例:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
Run Code Online (Sandbox Code Playgroud)

数据集API

Dataset API是DataFrames的扩展,它提供了一种类型安全的,面向对象的编程接口.它是一个强类型,不可变的对象集合,映射到关系模式.

在数据集的核心,API是一个称为编码器的新概念,它负责在JVM对象和表格表示之间进行转换.表格表示使用Spark内部Tungsten二进制格式存储,允许对序列化数据进行操作并提高内存利用率.Spark 1.6支持自动生成各种类型的编码器,包括基本类型(例如String,Integer,Long),Scala案例类和Java Bean.

数据集特点: -

  • 提供RDD和Dataframe的最佳 功能: RDD(函数式编程,类型安全),DataFrame(关系模型,查询优化,钨执行,排序和混洗)

  • 编码器: 通过使用编码器,可以轻松地将任何JVM对象转换为数据集,从而允许用户使用结构化和非结构化数据,而不像Dataframe.

  • 支持的编程语言: Datasets API目前仅在Scala和Java中可用.1.6版目前不支持Python和R. Python支持定于2.0版.

  • 类型安全: 数据集API提供编译时安全性,这在Dataframe中是不可用的.在下面的示例中,我们可以看到Dataset如何使用编译lambda函数对域对象进行操作.

例:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
 // error : value salary is not a member of person
ds.rdd // returns RDD[Person]
Run Code Online (Sandbox Code Playgroud)
  • 可互操作:数据集允许您轻松地将现有RDD和数据帧转换为数据集而无需样板代码.

数据集API限制: -

  • 需要对String进行类型转换: 从数据集中查询数据当前要求我们将类中的字段指定为字符串.一旦我们查询了数据,就会强制将列转换为所需的数据类型.另一方面,如果我们在数据集上使用map操作,它将不使用Catalyst优化器.

例:

ds.select(col("name").as[String], $"age".as[Int]).collect()
Run Code Online (Sandbox Code Playgroud)

不支持Python和R:从1.6版开始,Datasets仅支持Scala和Java.Python支持将引入Python 2.0.

与现有的RDD和Dataframe API相比,Datasets API带来了一些优势,具有更好的类型安全性和函数式编程.由于API中的类型转换要求的挑战,您仍然不会需要类型安全性并且会使代码变得脆弱.

  • 来自https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes,在Scala API中,DataFrame只是数据集[Row]的类型别名. (2认同)

mrs*_*vas 40

所有(RDD,DataFrame和DataSet)在一张图片中

RDD与DataFrame对比DataSet

图片学分

RDD

RDD 是一个容错的容错集合,可以并行操作.

数据帧

RDD是一个组织成命名列的数据集.它在概念上等同于关系数据库中的表或R/Python中的数据框,但在底层具有更丰富的优化.

数据集

DataFrame是一个分布式数据集合.数据集是Spark 1.6中添加的一个新接口,它提供了RDD优势 (强类型,使用强大的lambda函数的能力)和Spark SQL优化执行引擎优点.


注意:

DataFrameScala/Java 中Rows()的数据集通常称为DataFrame.


很好地比较了所有这些与代码片段

RDD vs DataFrame与带有代码的DataSet

资源


问:你可以将一个转换为另一个,如RDD到DataFrame,反之亦然?

是的,两者都有可能

1. DatasetDatasetDataset[Row]

val rowsRdd: RDD[Row] = sc.parallelize(
  Seq(
    Row("first", 2.0, 7.0),
    Row("second", 3.5, 2.5),
    Row("third", 7.0, 5.9)
  )
)

val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2")

df.show()
+------+----+----+
|    id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
Run Code Online (Sandbox Code Playgroud)

更多方法:在Spark中将RDD对象转换为Dataframe

2. Nice comparison of all of them with a code snippet./ RDDDataFrame.toDF()方法

val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD
Run Code Online (Sandbox Code Playgroud)


Ven*_*ive 25

简单来说RDD就是核心组件,但它DataFrame是Spark 1.30中引入的API.

RDD

调用数据分区的集合RDD.这些RDD必须遵循以下几个属性:

  • 一成不变的,
  • 容错,
  • 分散式,
  • 更多.

RDD是结构化的或非结构化的.

数据帧

DataFrame是Scala,Java,Python和R中可用的API.它允许处理任何类型的结构化和半结构化数据.要定义DataFrame,组织成命名列的分布式数据集合称为DataFrame.您可以轻松地优化RDDsDataFrame.您可以使用一次处理JSON数据,镶木地板数据,HiveQL数据DataFrame.

val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")

val sample_DF = sampleRDD.toDF()
Run Code Online (Sandbox Code Playgroud)

这里Sample_DF考虑为DataFrame.sampleRDD是(原始数据)调用RDD.


Yuv*_*kov 24

因为DataFrame弱类型而开发人员没有获得类型系统的好处.例如,假设您想从SQL读取内容并在其上运行一些聚合:

val people = sqlContext.read.parquet("...")
val department = sqlContext.read.parquet("...")

people.filter("age > 30")
  .join(department, people("deptId") === department("id"))
  .groupBy(department("name"), "gender")
  .agg(avg(people("salary")), max(people("age")))
Run Code Online (Sandbox Code Playgroud)

当你说people("deptId"),你没有回来Int,或者Long你正在找回Column你需要操作的物体.在具有诸如Scala之类的丰富类型系统的语言中,最终会失去所有类型安全性,从而增加了在编译时可以发现的事物的运行时错误的数量.

相反,DataSet[T]是打字的.当你这样做时:

val people: People = val people = sqlContext.read.parquet("...").as[People]
Run Code Online (Sandbox Code Playgroud)

你实际上是在找一个People对象,它deptId是一个实际的整数类型而不是列类型,因此利用了类型系统.

从Spark 2.0开始,DataFrame和DataSet API将统一起来,其中DataFrame将是一个类型别名DataSet[Row].

  • 确切地说,对于Spark 2.0`Dataframe` [只是一个别名](https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/sql/core/src/main/scala/org/apache/spark/sql /package.scala#L45)用于`Dataset [Row]` (5认同)

vaq*_*han 10

大多数答案都是正确的,只想在这里添加一点

在Spark 2.0中,两个API(DataFrame + DataSet)将统一到一个API中.

"统一DataFrame和数据集:在Scala和Java中,DataFrame和Dataset已经统一,即DataFrame只是Row数据集的类型别名.在Python和R中,由于缺乏类型安全性,DataFrame是主要的编程接口."

数据集与RDD类似,但是,它们不使用Java序列化或Kryo,而是使用专用的编码器来序列化对象以便通过网络进行处理或传输.

Spark SQL支持两种不同的方法将现有RDD转换为数据集.第一种方法使用反射来推断包含特定类型对象的RDD的模式.这种基于反射的方法可以使代码更加简洁,并且在编写Spark应用程序时已经了解了模式.

创建数据集的第二种方法是通过编程接口,允许您构建模式,然后将其应用于现有RDD.虽然此方法更详细,但它允许您在直到运行时才知道列及其类型时构造数据集.

在这里,您可以找到RDD tof数据框对话答案

如何将rdd对象转换为spark中的dataframe


Sil*_*aze 9

DataFrame等同于RDBMS中的表,也可以通过类似于RDD中"本机"分布式集合的方式进行操作.与RDD不同,Dataframes跟踪架构并支持各种关系操作,从而实现更优化的执行.每个DataFrame对象代表一个逻辑计划,但由于它们的"惰性"特性,在用户调用特定的"输出操作"之前不会执行任何操作.

  • 顺便说一下,R​​DD也很懒。 (2认同)

Nee*_*thu 5

Dataframe是Row对象的RDD,每个对象代表一条记录.Dataframe还知道其行的模式(即数据字段).虽然Dataframes看起来像常规RDD,但在内部它们以更有效的方式存储数据,利用其架构.此外,它们还提供RDD上不可用的新操作,例如运行SQL查询的功能.可以从外部数据源,查询结果或常规RDD创建数据帧.

参考文献:Zaharia M.,et al.学习星火(O'Reilly,2015)


skv*_*yas 5

从使用角度来看,RDD与DataFrame的见解很少:

  1. RDD很棒!因为它们使我们所有人都能灵活地处理几乎任何类型的数据;非结构化,半结构化和结构化数据。由于很多时候数据还没有准备好适合数据框架(甚至是JSON),因此可以使用RDD对数据进行预处理,以使其适合数据框架。RDD是Spark中的核心数据抽象。
  2. 并非所有在RDD上可能进行的转换都可以在DataFrames上进行,例如减去()用于RDD,而exception()用于DataFrame。
  3. 由于DataFrames就像一个关系表,因此在使用集合/关系理论转换时它们遵循严格的规则,例如,如果您想合并两个数据框,则要求两个df具有相同数量的列和关联的列数据类型。列名可以不同。这些规则不适用于RDD。这是一个很好的教程,解释了这些事实。
  4. 使用DataFrame可以提高性能,正如其他已经深入解释的。
  5. 使用DataFrame,您不需要像使用RDD进行编程时那样传递任意函数。
  6. 您需要使用SQLContext / HiveContext对数据框架进行编程,因为它们位于Spark生态系统的SparkSQL区域中,但是对于RDD,您只需要位于Spark Core库中的SparkContext / JavaSparkContext。
  7. 如果可以为其定义架构,则可以从RDD创建df。
  8. 您也可以将df转换为rdd,将rdd转换为df。

希望对您有所帮助!