Spark中的快速镶木地板行数

Dan*_*bos 11 apache-spark parquet

Parquet文件包含每块行计数字段.Spark似乎在某些时候读取它(SpecificParquetRecordReaderBase.java#L151).

我试过这个spark-shell:

sqlContext.read.load("x.parquet").count
Run Code Online (Sandbox Code Playgroud)

Spark分为两个阶段,显示了DAG中的各种聚合步骤.我认为这意味着它正常读取文件而不是使用行计数.(我可能是错的.)

问题是:当我运行时Spark是否已经使用行计数字段count?是否有其他API可以使用这些字段?出于某种原因,依赖这些领域是个坏主意吗?

Den*_*Lee 13

这是正确的,Spark在您运行时已经在使用rowcounts字段count.

稍微深入了解一下细节,当使用平面模式提交作为[SPARK-11787]加速平面模式的镶木地板阅读器的一部分,SpecificParquetRecordReaderBase.java引用了改进的Parquet扫描性能.注意,此提交包含在Spark 1.6分支中.

如果查询是行计数,它几乎按照您描述的方式工作(即读取元数据).如果谓词完全满足最小值/最大值,那么它应该也能正常工作,尽管它没有完全验证.使用这些Parquet字段并不是一个坏主意,但正如前面的声明所暗示的那样,关键问题是确保谓词过滤与元数据匹配,这样您就可以进行准确的计数.

为了帮助理解为什么有两个阶段,这里是运行count()语句时创建的DAG.

在此输入图像描述

在深入挖掘两个阶段时,请注意第一个阶段(阶段25)正在运行文件扫描,而第二个阶段(阶段26)运行洗牌以进行计数.

在此输入图像描述 在此输入图像描述

感谢Nong Li(SpecificParquetRecordReaderBase.java提交的作者)进行验证!

 

更新

为了在Dataset.countParquet和Parquet 之间的桥梁上提供额外的上下文,围绕它的内部逻辑的流程是:

  • Spark不会读取任何Parquet列来计算计数
  • 将Parquet架构传递给VectorizedParquetRecordReader它实际上是一个空的Parquet消息
  • 使用存储在Parquet文件页脚中的元数据计算计数.涉及在迭代器中包含上述内容,该迭代器返回InternalRow每个InternalRow.scala.

为了在内部使用Parquet文件格式,Apache Spark使用返回an的迭代器来包装逻辑InternalRow; 更多信息可以在InternalRow.scala中找到.最终,count()聚合函数使用此迭代器与底层Parquet数据源交互.顺便说一下,矢量化和非矢量化的Parquet读取器都是如此.

因此,要Dataset.count()与Parquet阅读器桥接,路径是:

  • Dataset.count()调用计划在具有单个count()聚合函数的聚合运算符中.
  • 在计划时为聚合运算符生成Java代码以及count()聚合函数.
  • 生成的Java代码与底层数据源ParquetFileFormat与RecordReaderIterator交互,RecordReaderIterator由Spark数据源API在内部使用.

有关更多信息,请参阅Parquet Count元数据说明.

  • 谢谢你非常详细的答案!当你说_"它几乎按照你所描述的方式工作"时_你的意思是它扫描整个文件,还是看看Parquet文件中的元数据?(我担心我在问题中描述了两种可能性......)同样适用于_"这是正确的"_在开始时:).谢谢你的任何澄清! (2认同)