将镶木地板文件加载到Spark中的case类的性能

jrj*_*rjd 7 scala apache-spark parquet apache-spark-sql

我正在评估在Spark中加载Parquet文件的不同方法的性能,差异是惊人的.

在我们的Parquet文件中,我们有类似的嵌套case类:

case class C(/* a dozen of attributes*/)
case class B(/* a dozen of attributes*/, cs: Seq[C])
case class A(/* a dozen of attributes*/, bs: Seq[B])
Run Code Online (Sandbox Code Playgroud)

从Parquet文件加载它们需要一段时间.所以我已经完成了从Parquet文件加载案例类的不同方法的基准,并使用Spark 1.6和2.0对一个字段求​​和.

以下是我做的基准测试的总结:

val df: DataFrame = sqlContext.read.parquet("path/to/file.gz.parquet").persist()
df.count()

// Spark 1.6

// Play Json
// 63.169s
df.toJSON.flatMap(s => Try(Json.parse(s).as[A]).toOption)
         .map(_.fieldToSum).sum()

// Direct access to field using Spark Row
// 2.811s
df.map(row => row.getAs[Long]("fieldToSum")).sum()

// Some small library we developed that access fields using Spark Row
// 10.401s
df.toRDD[A].map(_.fieldToSum).sum()

// Dataframe hybrid SQL API
// 0.239s
df.agg(sum("fieldToSum")).collect().head.getAs[Long](0)

// Dataset with RDD-style code
// 34.223s
df.as[A].map(_.fieldToSum).reduce(_ + _)

// Dataset with column selection
// 0.176s
df.as[A].select($"fieldToSum".as[Long]).reduce(_ + _)


// Spark 2.0

// Performance is similar except for:

// Direct access to field using Spark Row
// 23.168s
df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)

// Some small library we developed that access fields using Spark Row
// 32.898s
f1DF.toRDD[A].map(_.fieldToSum).sum()
Run Code Online (Sandbox Code Playgroud)

我理解为什么升级到Spark 2.0时使用Spark Row的方法的性能会降低,因为Dataframe现在它只是一个别名Dataset[Row].我想这就是统一接口的成本.

另一方面,我很失望Dataset没有保留承诺:使用RDD样式编码(maps和flatMaps)时的性能比使用Dataset类似DataframeSQL的DSL 时更差.

基本上,为了获得良好的性能,我们需要放弃类型安全.

  1. Dataset使用RDD和Dataset用作这种差异的原因是Dataframe什么?

  2. 有没有办法提高编码性能,Dataset以等同RDD风格的编码和SQL风格的编码性能?对于数据工程,使用RDD样式编码要清晰得多.

  3. 此外,使用类似SQL的DSL需要展平我们的数据模型,而不是使用嵌套的case类.我是对的,只有平面数据模型才能实现良好的性能?

zer*_*323 3

用作 RDD 的 Dataset 和用作 Dataframe 的 Dataset 之间存在这种差异的原因是什么?

为了获得一些见解,让我们思考一下 Spark SQL 使用的优化。据我所知,与 plain 相比,有三种类型的改进RDD

  • 执行计划优化(预测和选择下推、不断折叠),
  • 本机(堆外)内存使用和高效的列式缓存格式,
  • 代码生成。

现在的问题是,并非所有这些技术在受限编程模型(例如 SQL)之外都有用。

例如,可以下推选择(过滤器),但投影非常有限(您不能真正拥有对象的一部分,可以吗?)。类似地,代码生成依赖于明确定义的语义,并且不容易普遍应用(它基本上是一个编译器,生成可以由 JVM 进一步优化的代码)。

Finalsun.misc.Unsafe是一种提高性能的神奇方法,但它并不是免费的。虽然这里有很多增益,但编码和解码的开销也很大。

使用类似 SQL 的 DSL 需要扁平化我们的数据模型并且不使用嵌套的 case 类。

嵌套结构并不完全是一等公民,并且存在一些记录不足的限制,您仍然可以在这里做很多事情。

升级到 Spark 2.0 时,使用 Spark Row 的方法的性能会下降,因为 Dataframe 现在只是 Dataset[Row] 的别名。我猜这就是统一接口的成本。

虽然存在一些性能下降,但这两段代码根本不等效。2.0.0+ 中的DataFrame.map签名与 1.x 版本中的签名不同。如果你想让这两者具有可比性,你应该RDD首先转换为:

df.rdd.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)
Run Code Online (Sandbox Code Playgroud)