Spark 2.0 Dataset vs DataFrame

Geo*_*ler 23 scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

从spark 2.0.1开始我有一些问题.我阅读了很多文档,但到目前为止找不到足够的答案:

  • 有什么区别
    • df.select("foo")
    • df.select($"foo")
  • 我能正确理解吗
    • myDataSet.map(foo.someVal)是类型安全的,不会转换为RDD但保留在DataSet表示/没有额外的开销(2.0.0的性能明智)
  • 所有其他命令,例如select,..只是语法糖.它们不是类型安全的,可以使用地图代替.如果df.select("foo")没有地图声明,我怎么能输入?
    • 为什么我应该使用UDF/UADF而不是地图(假设地图保留在数据集表示中)?

use*_*411 30

  1. df.select("foo") 和之间的区别df.select($"foo")是签名.前者至少需要一个String,后者需要零或更多Columns.除此之外没有实际的区别.
  2. myDataSet.map(foo.someVal)类型检查,但作为对象的任何Dataset操作使用RDD,并与DataFrame操作进行比较,会产生很大的开销.我们来看一个简单的例子:

    case class FooBar(foo: Int, bar: String)
    val ds = Seq(FooBar(1, "x")).toDS
    ds.map(_.foo).explain
    
    Run Code Online (Sandbox Code Playgroud)
    == Physical Plan ==
    *SerializeFromObject [input[0, int, true] AS value#123]
    +- *MapElements <function1>, obj#122: int
       +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
          +- LocalTableScan [foo#117, bar#118]
    
    Run Code Online (Sandbox Code Playgroud)

    正如您所看到的,此执行计划需要访问所有字段并且必须DeserializeToObject.

  3. 不可以.通常,其他方法不是语法糖,并产生明显不同的执行计划.例如:

    ds.select($"foo").explain
    
    Run Code Online (Sandbox Code Playgroud)
    == Physical Plan ==
    LocalTableScan [foo#117]
    
    Run Code Online (Sandbox Code Playgroud)

    与之前显示的计划相比,它可以直接访问列.它不是API的限制,而是操作语义差异的结果.

  4. 如果没有map语句,我怎么能df.select("foo")类型安全?

    没有这样的选择.虽然键入的列允许您静态Dataset转换为另一个静态类型Dataset:

    ds.select($"bar".as[Int])
    
    Run Code Online (Sandbox Code Playgroud)

    没有类型安全.还有一些其他尝试包括类型安全优化操作,如类型聚合,但这个实验API.

  5. 为什么我应该使用UDF/UADF而不是地图

    这完全取决于你.Spark中的每个分布式数据结构都有其自身的优点和缺点(例如,参见Spark UDAF with ArrayType作为bufferSchema性能问题).

就个人而言,我发现静态类型Dataset是最不实用的:

  • 不要提供相同的优化范围Dataset[Row](尽管它们共享存储格式和一些执行计划优化,但它不能完全受益于代码生成或堆外存储),也不能访问所有的分析功能DataFrame.

  • 类型转换是黑盒子,有效地为优化器创建分析障碍.例如,不能通过类型转换推送选择(过滤器):

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
    
    Run Code Online (Sandbox Code Playgroud)
    == Physical Plan ==
    *Filter (foo#133 = 1)
    +- *Filter <function1>.apply
       +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
          +- Exchange hashpartitioning(foo#133, 200)
             +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
                +- LocalTableScan [foo#133, bar#134]
    
    Run Code Online (Sandbox Code Playgroud)

    相比:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
    
    Run Code Online (Sandbox Code Playgroud)
    == Physical Plan ==
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
    +- Exchange hashpartitioning(foo#133, 200)
       +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
          +- *Filter (foo#133 = 1)
             +- LocalTableScan [foo#133, bar#134] 
    
    Run Code Online (Sandbox Code Playgroud)

    这会影响谓词下推或投影下推等功能.

  • 没有像RDDs本机支持的只有一小部分类型一样灵活.

  • 使用方法转换Encoders时,"类型安全" 是有争议的.由于数据形状不使用签名进行编码,因此编译器只能验证是否存在.DatasetasEncoder

相关问题: