Spark没有利用镶木地板的hdfs分区

Ale*_*exL 5 hadoop bigdata hdfs apache-spark parquet

我正在使用以下命令将镶木地板文件写入hdfs: df.write.mode(SaveMode.Append).partitionBy(id).parquet(path)

之后我正在阅读并过滤文件,如下所示:

val file = sqlContext.read.parquet(folder)
val data = file.map(r => Row(r.getInt(4).toString, r.getString(0), r.getInt(1),
    r.getLong(2), r.getString(3)))

val filteredData = data.filter(x => x.thingId.equals("1"))
filteredData.collect()
Run Code Online (Sandbox Code Playgroud)

我希望,Spark会利用文件的分区,只读取"thingId = 1"的分区.事实上,Spark会读取文件的所有分区,而不仅仅是已过滤的分区(具有thingId = 1的分区).如果我查看日志,我可以看到它确实读取了所有内容:

16/03/21 01:32:33 INFO ParquetRelation:从hdfs://sandbox.hortonworks.com/path/id=1/part-r-00000-b4e27b02-9a21-4915-89a7-读取Parquet文件189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:从hdfs://sandbox.hortonworks.com/path/id=42/part-r-00000-b4e27b02-9a21读取Parquet文件-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:从hdfs://sandbox.hortonworks.com/path/id=17/part-r-读取Parquet文件00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:从hdfs://sandbox.hortonworks.com/path/0833/id读取Parquet文件= 33/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:从hdfs://sandbox.hortonworks读取Parquet文件. com/path/id = 26/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:从hdfs读取Parquet文件:/ /sandbox.hortonworks.com/path/id=12/part-r-00000-b4e27b02-9a2 1-4915-89a7-189c30ca3fe3.gz.parquet

有什么我想念的吗?当我查看文档时,Spark应该根据过滤器知道它应该只读取具有thingID = 1的分区.你们有没有人知道这个问题是什么?

Tza*_*har 6

一些问题可能会阻止Spark成功"推下"谓词(即在输入格式级别使用过滤器):

  1. filter-pushdown为OFF:根据您使用的Spark版本,谓词下推选项(spark.sql.parquet.filterPushdown)可能会被关闭.从Spark 1.5.0开始默认为ON - 请检查您的版本和配置

  2. filter是"不透明的":这似乎就是这种情况:你正在加载镶木地板文件,将每一行映射到另一行(重新排序列?),然后使用filter接受函数的方法.Spark无法"读取"功能代码并意识到它在分区列上使用了比较 - 对Spark来说,这只是一个Row => Boolean可以进行各种检查的函数......

    要使过滤器下推工作,您需要在将记录映射到与原始结构"分离"的内容之前使用它,并使用其中一个filter使用Spark可解析的过滤器的重载,例如:

    // assuming the relevant column name is "id" in the parquet structure
    val filtered = file.filter("id = 1") 
    
    // or:
    val filtered = file.filter(col("id") === 1) 
    
    // and only then:
    val data = filtered.map(r => Row(...))
    
    Run Code Online (Sandbox Code Playgroud)