jrj*_*rjd 7 scala apache-spark parquet apache-spark-sql
我正在评估在Spark中加载Parquet文件的不同方法的性能,差异是惊人的.
在我们的Parquet文件中,我们有类似的嵌套case类:
case class C(/* a dozen of attributes*/)
case class B(/* a dozen of attributes*/, cs: Seq[C])
case class A(/* a dozen of attributes*/, bs: Seq[B])
Run Code Online (Sandbox Code Playgroud)
从Parquet文件加载它们需要一段时间.所以我已经完成了从Parquet文件加载案例类的不同方法的基准,并使用Spark 1.6和2.0对一个字段求和.
以下是我做的基准测试的总结:
val df: DataFrame = sqlContext.read.parquet("path/to/file.gz.parquet").persist()
df.count()
// Spark 1.6
// Play Json
// 63.169s
df.toJSON.flatMap(s => Try(Json.parse(s).as[A]).toOption)
.map(_.fieldToSum).sum()
// Direct access to field using Spark Row
// 2.811s
df.map(row => row.getAs[Long]("fieldToSum")).sum()
// Some small library we developed that access fields using Spark Row
// 10.401s
df.toRDD[A].map(_.fieldToSum).sum()
// Dataframe hybrid SQL API
// 0.239s
df.agg(sum("fieldToSum")).collect().head.getAs[Long](0)
// Dataset with RDD-style code
// 34.223s
df.as[A].map(_.fieldToSum).reduce(_ + _)
// Dataset with column selection
// 0.176s
df.as[A].select($"fieldToSum".as[Long]).reduce(_ + _)
// Spark 2.0
// Performance is similar except for:
// Direct access to field using Spark Row
// 23.168s
df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)
// Some small library we developed that access fields using Spark Row
// 32.898s
f1DF.toRDD[A].map(_.fieldToSum).sum()
Run Code Online (Sandbox Code Playgroud)
我理解为什么升级到Spark 2.0时使用Spark Row的方法的性能会降低,因为Dataframe现在它只是一个别名Dataset[Row].我想这就是统一接口的成本.
另一方面,我很失望Dataset没有保留承诺:使用RDD样式编码(maps和flatMaps)时的性能比使用Dataset类似DataframeSQL的DSL 时更差.
基本上,为了获得良好的性能,我们需要放弃类型安全.
Dataset使用RDD和Dataset用作这种差异的原因是Dataframe什么?
有没有办法提高编码性能,Dataset以等同RDD风格的编码和SQL风格的编码性能?对于数据工程,使用RDD样式编码要清晰得多.
此外,使用类似SQL的DSL需要展平我们的数据模型,而不是使用嵌套的case类.我是对的,只有平面数据模型才能实现良好的性能?
用作 RDD 的 Dataset 和用作 Dataframe 的 Dataset 之间存在这种差异的原因是什么?
为了获得一些见解,让我们思考一下 Spark SQL 使用的优化。据我所知,与 plain 相比,有三种类型的改进RDD:
现在的问题是,并非所有这些技术在受限编程模型(例如 SQL)之外都有用。
例如,可以下推选择(过滤器),但投影非常有限(您不能真正拥有对象的一部分,可以吗?)。类似地,代码生成依赖于明确定义的语义,并且不容易普遍应用(它基本上是一个编译器,生成可以由 JVM 进一步优化的代码)。
Finalsun.misc.Unsafe是一种提高性能的神奇方法,但它并不是免费的。虽然这里有很多增益,但编码和解码的开销也很大。
使用类似 SQL 的 DSL 需要扁平化我们的数据模型并且不使用嵌套的 case 类。
嵌套结构并不完全是一等公民,并且存在一些记录不足的限制,您仍然可以在这里做很多事情。
升级到 Spark 2.0 时,使用 Spark Row 的方法的性能会下降,因为 Dataframe 现在只是 Dataset[Row] 的别名。我猜这就是统一接口的成本。
虽然存在一些性能下降,但这两段代码根本不等效。2.0.0+ 中的DataFrame.map签名与 1.x 版本中的签名不同。如果你想让这两者具有可比性,你应该RDD首先转换为:
df.rdd.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)
Run Code Online (Sandbox Code Playgroud)