为什么Apache Spark会在嵌套结构中读取不必要的Parquet列?

Pet*_*ens 22 apache-spark parquet spark-dataframe

我的团队正在构建一个ETL过程,使用Spark将原始分隔文本文件加载到基于Parquet的"数据湖"中.Parquet列存储的一个承诺是查询只会读取必要的"列条带".

但是我们看到嵌套模式结构正在读取意外的列.

为了演示,这里有一个使用Scala和Spark 2.0.1 shell的POC:

// Preliminary setup
sc.setLogLevel("INFO")
import org.apache.spark.sql.types._
import org.apache.spark.sql._

// Create a schema with nested complex structures
val schema = StructType(Seq(
    StructField("F1", IntegerType),
    StructField("F2", IntegerType),
    StructField("Orig", StructType(Seq(
        StructField("F1", StringType),
        StructField("F2", StringType))))))

// Create some sample data
val data = spark.createDataFrame(
    sc.parallelize(Seq(
        Row(1, 2, Row("1", "2")),
        Row(3, null, Row("3", "ABC")))),
    schema)

// Save it
data.write.mode(SaveMode.Overwrite).parquet("data.parquet")
Run Code Online (Sandbox Code Playgroud)

然后我们将文件读回DataFrame并投影到列的子集:

// Read it back into another DataFrame
val df = spark.read.parquet("data.parquet")

// Select & show a subset of the columns
df.select($"F1", $"Orig.F1").show
Run Code Online (Sandbox Code Playgroud)

当这个运行时,我们看到预期的输出:

+---+-------+
| F1|Orig_F1|
+---+-------+
|  1|      1|
|  3|      3|
+---+-------+
Run Code Online (Sandbox Code Playgroud)

但是......查询计划显示了一个略有不同的故事:

"优化计划"显示:

val projected = df.select($"F1", $"Orig.F1".as("Orig_F1"))
projected.queryExecution.optimizedPlan
// Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- Relation[F1#18,F2#19,Orig#20] parquet
Run Code Online (Sandbox Code Playgroud)

并且"解释"显示:

projected.explain
// == Physical Plan ==
// *Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- *Scan parquet [F1#18,Orig#20] Format: ParquetFormat, InputPaths: hdfs://sandbox.hortonworks.com:8020/user/stephenp/data.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<F1:int,Orig:struct<F1:string,F2:string>>
Run Code Online (Sandbox Code Playgroud)

执行期间生成的INFO日志也确认意外读取了Orig.F2列:

16/10/21 15:13:15 INFO parquet.ParquetReadSupport: Going to read the following fields from the Parquet file:

Parquet form:
message spark_schema {
  optional int32 F1;
  optional group Orig {
    optional binary F1 (UTF8);
    optional binary F2 (UTF8);
  }
}

Catalyst form:
StructType(StructField(F1,IntegerType,true), StructField(Orig,StructType(StructField(F1,StringType,true), StructField(F2,StringType,true)),true))
Run Code Online (Sandbox Code Playgroud)

根据Dremel论文Parquet文档,复杂嵌套结构的列应独立存储并可独立检索.

问题:

  1. 此行为是当前Spark查询引擎的限制吗?换句话说,Parquet是否支持以最佳方式执行此查询,但Spark的查询规划器是否天真?
  2. 或者,这是当前Parquet实施的限制吗?
  3. 或者,我没有正确使用Spark API吗?
  4. 或者,我是否误解了Dremel/Parquet柱存储应该如何工作?

可能相关:为什么查询性能与Spark SQL中的嵌套列不同?

Ewa*_*ith 5

目前是 Spark 查询引擎的一个限制,相关的 JIRA 票如下,spark 只处理 Parquet 中简单类型的谓词下推,而不是嵌套的 StructTypes

https://issues.apache.org/jira/browse/SPARK-17636


Cai*_*hen 0

该问题自 Spark 2.4.0 起已得到修复。这适用于结构体以及结构体数组。

Spark 3.0.0之前:

设置spark.sql.optimizer.nestedSchemaPruning.enabledtrue

请参阅此处相关的 Jira:https ://issues.apache.org/jira/browse/SPARK-4502

Spark 3.0.0之后:

spark.sql.optimizer.nestedSchemaPruning.enabled现在默认是true

相关 Jira 在这里:https ://issues.apache.org/jira/browse/SPARK-29805

还有相关的SO问题:EfficientreadingnestedparquetcolumninSpark