Ale*_*exL 5 hadoop bigdata hdfs apache-spark parquet
我正在使用以下命令将镶木地板文件写入hdfs:
df.write.mode(SaveMode.Append).partitionBy(id).parquet(path)
之后我正在阅读并过滤文件,如下所示:
val file = sqlContext.read.parquet(folder)
val data = file.map(r => Row(r.getInt(4).toString, r.getString(0), r.getInt(1),
r.getLong(2), r.getString(3)))
val filteredData = data.filter(x => x.thingId.equals("1"))
filteredData.collect()
Run Code Online (Sandbox Code Playgroud)
我希望,Spark会利用文件的分区,只读取"thingId = 1"的分区.事实上,Spark会读取文件的所有分区,而不仅仅是已过滤的分区(具有thingId = 1的分区).如果我查看日志,我可以看到它确实读取了所有内容:
16/03/21 01:32:33 INFO ParquetRelation:从hdfs://sandbox.hortonworks.com/path/id=1/part-r-00000-b4e27b02-9a21-4915-89a7-读取Parquet文件189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:从hdfs://sandbox.hortonworks.com/path/id=42/part-r-00000-b4e27b02-9a21读取Parquet文件-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:从hdfs://sandbox.hortonworks.com/path/id=17/part-r-读取Parquet文件00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:从hdfs://sandbox.hortonworks.com/path/0833/id读取Parquet文件= 33/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:从hdfs://sandbox.hortonworks读取Parquet文件. com/path/id = 26/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:从hdfs读取Parquet文件:/ /sandbox.hortonworks.com/path/id=12/part-r-00000-b4e27b02-9a2 1-4915-89a7-189c30ca3fe3.gz.parquet
有什么我想念的吗?当我查看文档时,Spark应该根据过滤器知道它应该只读取具有thingID = 1的分区.你们有没有人知道这个问题是什么?
一些问题可能会阻止Spark成功"推下"谓词(即在输入格式级别使用过滤器):
filter-pushdown为OFF:根据您使用的Spark版本,谓词下推选项(spark.sql.parquet.filterPushdown)可能会被关闭.从Spark 1.5.0开始默认为ON - 请检查您的版本和配置
filter是"不透明的":这似乎就是这种情况:你正在加载镶木地板文件,将每一行映射到另一行(重新排序列?),然后使用filter接受函数的方法.Spark无法"读取"功能代码并意识到它在分区列上使用了比较 - 对Spark来说,这只是一个Row => Boolean可以进行各种检查的函数......
要使过滤器下推工作,您需要在将记录映射到与原始结构"分离"的内容之前使用它,并使用其中一个filter使用Spark可解析的过滤器的重载,例如:
// assuming the relevant column name is "id" in the parquet structure
val filtered = file.filter("id = 1")
// or:
val filtered = file.filter(col("id") === 1)
// and only then:
val data = filtered.map(r => Row(...))
Run Code Online (Sandbox Code Playgroud)| 归档时间: |
|
| 查看次数: |
675 次 |
| 最近记录: |