为什么谓词下推没有在类型化数据集API中使用(与非类型化数据框架API相比)?

Paw*_*oda 9 dataframe apache-spark apache-spark-sql apache-spark-dataset

我一直认为数据集/数据帧API是相同的......唯一的区别是数据集API将为您提供编译时安全性.对 ?

那么......我的案子非常简单:

 case class Player (playerID: String, birthYear: Int)

 val playersDs: Dataset[Player] = session.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .csv(PeopleCsv)
  .as[Player]

 // Let's try to find players born in 1999. 
 // This will work, you have compile time safety... but it will not use predicate pushdown!!!
 playersDs.filter(_.birthYear == 1999).explain()

 // This will work as expected and use predicate pushdown!!!
 // But you can't have compile time safety with this :(
 playersDs.filter('birthYear === 1999).explain()
Run Code Online (Sandbox Code Playgroud)

从第一个示例解释将显示它不执行谓词下推(注意空PushedFilters):

== Physical Plan ==
*(1) Filter <function1>.apply
+- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...
Run Code Online (Sandbox Code Playgroud)

虽然第二个样本将正确执行(Notice PushedFilters):

== Physical Plan ==
*(1) Project [.....]
+- *(1) Filter (isnotnull(birthYear#11) && (birthYear#11 = 1999))
   +- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [IsNotNull(birthYear), EqualTo(birthYear,1999)], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...
Run Code Online (Sandbox Code Playgroud)

所以问题是..我怎样才能使用DS Api,并且编译时安全..,谓词下推按预期工作????

可能吗 ?如果不是..这是否意味着DS api为您提供编译时安全性......但是以性能为代价!! ??? (在这种情况下DF会快得多..特别是在处理大型镶木地板文件时)

Jac*_*ski 16

这就是你的物理计划中的线,你应该记住知道Dataset[T]DataFrame(和Dataset[Row])之间的真正区别.

Filter <function1>.apply
Run Code Online (Sandbox Code Playgroud)

我一直说人们应该远离类型化的数据集API并继续使用无类型的DataFrame API,因为Scala代码在很多地方成为优化器的黑盒子.您只需点击其中一个,并考虑Spark SQL远离JVM以避免GC的所有对象的反序列化.每次触摸对象时,你都要求Spark SQL反序列化对象并将它们加载到JVM上,从而给GC带来很大的压力(与非类型化的DataFrame API相比,使用类型化的数据集API会更频繁地触发它).

UDF是Blackbox - 除非你没有选择,否则不要使用它们.


在我在dev@spark.ao邮件列表上提出同样的问题后引用Reynold Xin:

UDF是一个黑盒子,因此Spark无法知道它正在处理什么.在一些简单的情况下,我们可以分析UDF字节代码并推断它在做什么,但一般来说这很难做到.

这种情况下有一张JIRA票据SPARK-14083分析JVM字节码并将关闭转换成Catalyst表达式,但正如有人所说(我认为它是推特上的Adam B.)很快就会有一种笑话. .

数据集API的一大优势是类型安全,由于严重依赖用户定义的闭包/ lambda而以性能为代价.这些闭包通常比表达式慢,因为我们可以更灵活地优化表达式(已知数据类型,无虚函数调用等).在许多情况下,查看这些闭包的字节代码并弄清楚他们想要做什么实际上并不是很困难.如果我们能够理解它们,那么我们可以将它们直接转换为Catalyst表达式,以实现更优化的执行.


// Let's try to find players born in 1999. 
// This will work, you have compile time safety... but it will not use predicate pushdown!!!
playersDs.filter(_.birthYear == 1999).explain()
Run Code Online (Sandbox Code Playgroud)

以上代码等同于以下内容:

val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) => p.birthYear == 1999
playersDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()
Run Code Online (Sandbox Code Playgroud)

someCodeSparkSQLCannotDoMuchOutOfIt 正是你把优化放在一边的地方,让Spark Optimizer跳过它.