Spark是否支持对S3中的镶木地板文件进行真正的列扫描?

con*_*lee 31 amazon-s3 apache-spark parquet apache-spark-sql

Parquet数据存储格式的一大好处是它是柱状的.如果我有一个包含数百列的'宽'数据集,但我的查询只涉及其中的一些,那么它可能只读取存储这几列的数据,并跳过其余的.

据推测,此功能的工作原理是在镶木地板文件的头部读取一些元数据,指示每列的文件系统上的位置.然后,读者可以在磁盘上寻找只读必要的列.

有谁知道spark的默认镶木地板阅读器是否正确地在S3上实现了这种选择性搜索?我认为它得到了S3的支持,但理论支持和正确利用该支持的实现之间存在很大差异.

Ste*_*ran 11

这需要分解

  1. Parquet代码是否从spark获得谓词(是)
  2. 然后,实木复合地板是否尝试使用Hadoop FileSystem seek()+ read()readFully(position, buffer, length)调用来有选择地只读取那些列?是
  3. S3连接器是否将这些文件操作转换为高效的HTTP GET请求?在亚马逊EMR:是的.在Apache Hadoop中,您需要在类路径上使用hadoop 2.8并正确设置spark.hadoop.fs.s3a.experimental.fadvise=random以触​​发随机访问.

Hadoop 2.7及更早版本处理攻击性极好的seek(),因为它们始终启动GET偏移结束文件,对下一次搜索感到惊讶,必须中止该连接,重新打开新的TCP/HTTPS 1.1连接(慢,CPU很重),反复做一遍.随机IO操作会对.csv.gz等大量加载造成伤害,但对于获得ORC/Parquet性能至关重要.

你没有得到Hadoop 2.7的hadoop-aws JAR的加速.如果你需要它,你需要更新hadoop*.jar和依赖项,或者从头开始针对Hadoop 2.8构建Spark

请注意,Hadoop 2.8+还有一个很好的小功能:如果您toString()在日志语句中调用S3A文件系统客户端,它会打印出所有文件系统IO统计信息,包括在搜索中丢弃了多少数据,中止了TCP连接和c.帮助您了解正在发生的事情.

2018-04-13警告::不要尝试将Hadoop 2.8+ hadoop-awsJAR与hadoop-2.7 JAR集的其余部分一起放在类路径上,并期望看到任何加速.您将看到的只是堆栈跟踪.您需要更新所有hadoop JAR及其传递依赖项.


Jac*_*ski 9

免责声明:我没有明确的答案,也不想作为权威来源,但是花了一些时间在Spark 2.2+的镶木地板支持上,我希望我的回答可以帮助我们所有人更接近正确答案.


S3上的Parquet是否避免从S3中提取未使用列的数据,只检索它需要的文件块,还是拉出整个文件?

我使用今天我从主人那里建立的Spark 2.3.0-SNAPSHOT.

parquet数据源格式由ParquetFileFormat处理,它是一个FileFormat.

如果我是正确的,读取部分由buildReaderWithPartitionValues方法处理(覆盖它们FileFormat).

buildReaderWithPartitionValuesFileSourceScanExec为所谓的输入RDD请求物理运算符时专门使用,这些RDD实际上是一个RDD,用于在WholeStageCodegenExec执行时生成内部行.

话虽如此,我认为回顾一下buildReaderWithPartitionValues可能会让我们更接近最终答案.

当你看到这条线时,你可以放心,我们正走在正确的轨道上.

//尝试在启用过滤器下推时按下过滤器.

该代码路径取决于默认情况下启用的spark.sql.parquet.filterPushdownSpark属性.

spark.sql.parquet.filterPushdown设置为true时启用Parquet过滤器下推优化.

如果定义了过滤器,那将导致我们使用parquet-hadoop的ParquetInputFormat.setFilterPredicate .

if (pushed.isDefined) {
  ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
Run Code Online (Sandbox Code Playgroud)

当代码回落到parquet-mr(而不是使用所谓的矢量化镶木地板解码读取器)时,使用过滤器时,代码会变得更有趣.这是我不太了解的部分(除了我在代码中可以看到的).

请注意,矢量化镶木地板解码阅读器由spark.sql.parquet.enableVectorizedReaderSpark属性控制,该属性默认情况下处于启用状态.

提示:要了解if表达式的哪个部分,请启用DEBUG记录org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat器的日志记录级别.

要查看所有下推过滤器,您可以打开INFO记录org.apache.spark.sql.execution.FileSourceScanExec器的日志记录级别.您应该在日志中看到以下内容:

INFO Pushed Filters: [pushedDownFilters]
Run Code Online (Sandbox Code Playgroud)

我确实希望,如果它不是一个明确的答案,它已经帮助了一点,有人在我离开的地方捡起它,很快就把它变成了一个.希望死了 :)


归档时间:

查看次数:

2635 次

最近记录:

7 年,10 月 前