Spark 不推送过滤器(PushedFilters 数组为空)

Key*_*r00 3 java csv apache-spark parquet

介绍

我注意到我们项目中的所有推送过滤器都不起作用。它解释了为什么执行时间受到影响,因为它读取了数百万次读取,而本应将其减少到数千次。为了调试问题,我编写了一个小测试,它读取一个 CSV 文件,过滤内容(PushDown Filter)并返回结果。

它不适用于 CSV,因此我尝试读取镶木地板文件。它们都不起作用。

数据

people.csv文件具有以下结构:

first_name,last_name,city  // header
FirstName1,LastName1,Bern // 1st row
FirstName2,LastName2,Sion // 2nd row
FirstName3,LastName3,Bulle // 3rd row
Run Code Online (Sandbox Code Playgroud)

注意:镶木地板文件具有相同的结构

读取 CSV 文件

为了重现这个问题,我编写了一个最小的代码来读取一个 csv 文件并且应该只返回过滤后的数据。

读取 csv 文件并打印物理计划:

Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
ds.where(col("city").equalTo("Bern")).show();
ds.explain(true);
Run Code Online (Sandbox Code Playgroud)

物理计划:

+-----------+---------+----+
|名字|姓氏|城市|
+-----------+---------+----+
|FirstName1|LastName1|Bern|
+------------+---------+----+

== 解析的逻辑计划 == 关系 [first_name#10,last_name#11,city#12] csv

== 分析的逻辑计划 == first_name: string, last_name: string, city: string Relation[first_name#10,last_name#11,city#12] csv

== 优化逻辑规划 == 关系[first_name#10,last_name#11,city#12] csv

== 物理计划 == *(1) FileScan csv [first_name#10,last_name#11,city#12] 批处理:false,格式:CSV,位置:InMemoryFileIndex[file:people.csv],PartitionFilters:[],PushedFilters : [], ReadSchema: struct

我已经用镶木地板文件进行了测试,但不幸的是结果是一样的。

我们可以注意到的是:

  • PushedFilters 为空,我希望过滤器包含谓词。
  • 返回的结果仍然是正确的。

我的问题是:为什么这个 PushedFilters 是空的?

注意:

  • 火花版本:2.4.3
  • 文件系统:ext4(和集群上的 HDFS,都没有工作)

Liz*_*ing 6

您正在对第一个数据集调用解释,该数据集只有读取。尝试使用类似的东西(抱歉,我只有 Scala 环境可用):

val ds: DataFrame = spark.read.option("header", "true").csv("input.csv")
val f = ds.filter(col("city").equalTo("Bern"))

f.explain(true)

f.show()
Run Code Online (Sandbox Code Playgroud)

此外,由于这个原因,在使用类型化数据集 API 时要小心。不过不应该是你的情况。