Geo*_*ler 23 scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0
从spark 2.0.1开始我有一些问题.我阅读了很多文档,但到目前为止找不到足够的答案:
df.select("foo")df.select($"foo")myDataSet.map(foo.someVal)是类型安全的,不会转换为RDD但保留在DataSet表示/没有额外的开销(2.0.0的性能明智)df.select("foo")没有地图声明,我怎么能输入?
use*_*411 30
df.select("foo")  和之间的区别df.select($"foo")是签名.前者至少需要一个String,后者需要零或更多Columns.除此之外没有实际的区别.myDataSet.map(foo.someVal)类型检查,但作为对象的任何Dataset操作使用RDD,并与DataFrame操作进行比较,会产生很大的开销.我们来看一个简单的例子:
case class FooBar(foo: Int, bar: String)
val ds = Seq(FooBar(1, "x")).toDS
ds.map(_.foo).explain
== Physical Plan ==
*SerializeFromObject [input[0, int, true] AS value#123]
+- *MapElements <function1>, obj#122: int
   +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
      +- LocalTableScan [foo#117, bar#118]
正如您所看到的,此执行计划需要访问所有字段并且必须DeserializeToObject.
不可以.通常,其他方法不是语法糖,并产生明显不同的执行计划.例如:
ds.select($"foo").explain
== Physical Plan ==
LocalTableScan [foo#117]
与之前显示的计划相比,它可以直接访问列.它不是API的限制,而是操作语义差异的结果.
如果没有map语句,我怎么能df.select("foo")类型安全?
没有这样的选择.虽然键入的列允许您静态Dataset转换为另一个静态类型Dataset:
ds.select($"bar".as[Int])
没有类型安全.还有一些其他尝试包括类型安全优化操作,如类型聚合,但这个实验API.
为什么我应该使用UDF/UADF而不是地图
这完全取决于你.Spark中的每个分布式数据结构都有其自身的优点和缺点(例如,参见Spark UDAF with ArrayType作为bufferSchema性能问题).
就个人而言,我发现静态类型Dataset是最不实用的:
不要提供相同的优化范围Dataset[Row](尽管它们共享存储格式和一些执行计划优化,但它不能完全受益于代码生成或堆外存储),也不能访问所有的分析功能DataFrame.
类型转换是黑盒子,有效地为优化器创建分析障碍.例如,不能通过类型转换推送选择(过滤器):
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
== Physical Plan ==
*Filter (foo#133 = 1)
+- *Filter <function1>.apply
   +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
      +- Exchange hashpartitioning(foo#133, 200)
         +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
            +- LocalTableScan [foo#133, bar#134]
相比:
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
== Physical Plan ==
*HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
+- Exchange hashpartitioning(foo#133, 200)
   +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
      +- *Filter (foo#133 = 1)
         +- LocalTableScan [foo#133, bar#134] 
这会影响谓词下推或投影下推等功能.
没有像RDDs本机支持的只有一小部分类型一样灵活.
Encoders时,"类型安全" 是有争议的.由于数据形状不使用签名进行编码,因此编译器只能验证是否存在.DatasetasEncoder相关问题:
| 归档时间: | 
 | 
| 查看次数: | 4687 次 | 
| 最近记录: |