Key*_*r00 3 java csv apache-spark parquet
介绍
我注意到我们项目中的所有推送过滤器都不起作用。它解释了为什么执行时间受到影响,因为它读取了数百万次读取,而本应将其减少到数千次。为了调试问题,我编写了一个小测试,它读取一个 CSV 文件,过滤内容(PushDown Filter)并返回结果。
它不适用于 CSV,因此我尝试读取镶木地板文件。它们都不起作用。
数据
该people.csv文件具有以下结构:
first_name,last_name,city // header
FirstName1,LastName1,Bern // 1st row
FirstName2,LastName2,Sion // 2nd row
FirstName3,LastName3,Bulle // 3rd row
Run Code Online (Sandbox Code Playgroud)
注意:镶木地板文件具有相同的结构
读取 CSV 文件
为了重现这个问题,我编写了一个最小的代码来读取一个 csv 文件并且应该只返回过滤后的数据。
读取 csv 文件并打印物理计划:
Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
ds.where(col("city").equalTo("Bern")).show();
ds.explain(true);
Run Code Online (Sandbox Code Playgroud)
物理计划:
+-----------+---------+----+
|名字|姓氏|城市|
+-----------+---------+----+
|FirstName1|LastName1|Bern|
+------------+---------+----+== 解析的逻辑计划 == 关系 [first_name#10,last_name#11,city#12] csv
== 分析的逻辑计划 == first_name: string, last_name: string, city: string Relation[first_name#10,last_name#11,city#12] csv
== 优化逻辑规划 == 关系[first_name#10,last_name#11,city#12] csv
== 物理计划 == *(1) FileScan csv [first_name#10,last_name#11,city#12] 批处理:false,格式:CSV,位置:InMemoryFileIndex[file:people.csv],PartitionFilters:[],PushedFilters : [], ReadSchema: struct
我已经用镶木地板文件进行了测试,但不幸的是结果是一样的。
我们可以注意到的是:
我的问题是:为什么这个 PushedFilters 是空的?
注意:
您正在对第一个数据集调用解释,该数据集只有读取。尝试使用类似的东西(抱歉,我只有 Scala 环境可用):
val ds: DataFrame = spark.read.option("header", "true").csv("input.csv")
val f = ds.filter(col("city").equalTo("Bern"))
f.explain(true)
f.show()
Run Code Online (Sandbox Code Playgroud)
此外,由于这个原因,在使用类型化数据集 API 时要小心。不过不应该是你的情况。