Spark SQL如何决定从Hive表加载数据时将使用的分区数?

mor*_*eus 12 apache-spark-sql

此问题与通过从Hive表读取数据创建的spark数据帧的分区数相同

但我认为这个问题得不到正确答案.请注意,问题是当使用SparkSession.sql方法对HIVE表执行sql查询而创建数据帧时,将询问将创建多少个分区.

IIUC,上面的问题不同于在执行某些代码时创建数据帧时会创建多少个分区,这些代码spark.read.json("examples/src/main/resources/people.json")直接从文件系统加载数据 - 可能是HDFS.我认为后一个问题的答案是由spark.sql.files.maxPartitionBytes给出的

spark.sql.files.maxPartitionBytes 134217728(128 MB)读取文件时打包到单个分区的最大字节数.

在实验上,我尝试从HIVE表创建一个数据帧,并且没有解释我得到的分区数 total data in hive table / spark.sql.files.maxPartitionBytes

同时添加到OP,最好知道如何控制分区的数量,即,当一个人想要强制使用与默认情况下不同的数字时使用火花.

参考文献:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

mor*_*eus 8

TL; DR:从Hive读取数据时的默认分区数将由HDFS blockSize控制.分区的数目可以增加通过设置mapreduce.job.maps到合适的值,并且可以通过设置能够降低mapreduce.input.fileinputformat.split.minsize到适当的值

从hive表加载数据时,Spark SQL会创建HadoopRDD的实例.

一种RDD,使用旧的MapReduce API(org.apache.hadoop.mapred)提供读取存储在Hadoop中的数据的核心功能(例如,HDFS中的文件,HBase或S3中的源).

在此输入图像描述

HadoopRDD依次根据org.apache.hadoop.mapreduce.lib.input.FileInputFormat(新API)和org.apache.hadoop.mapred.FileInputFormat(旧API)中computeSplitSize定义的方法拆分输入文件.

新API:

protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }
Run Code Online (Sandbox Code Playgroud)

旧API:

protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }
Run Code Online (Sandbox Code Playgroud)

computeSplitSize根据HDFS blockSize拆分文件,但是如果blockSize小于minSize或大于maxSize那么它被限制在那些极端.可以从中获取HDFS blockSize

hdfs getconf -confKey dfs.blocksize
Run Code Online (Sandbox Code Playgroud)

根据Hadoop最终指南表8.5,它minSize是从中获得的mapreduce.input.fileinputformat.split.minsize并且maxSize是从中获得的mapreduce.input.fileinputformat.split.maxsize.

在此输入图像描述

但是,该书还提到了mapreduce.input.fileinputformat.split.maxsize这一点:

旧的MapReduce API中不存在此属性(CombineFileInputFormat除外).相反,它间接计算为作业总输入的大小除以mapreduce.job.maps(或JobConf上的setNumMapTasks()方法)指定的map任务的指导数.

这篇文章还使用总输入大小除以map任务的数量来计算maxSize.

  • 新版本的答案是否仍然有效? (2认同)