Spark RDD默认分区数

Sri*_*Sri 12 scala apache-spark

版本:Spark 1.6.2,Scala 2.10

我正在执行下面的命令spark-shell.我试图查看Spark默认创建的分区数.

val rdd1 = sc.parallelize(1 to 10)
println(rdd1.getNumPartitions) // ==> Result is 4

//Creating rdd for the local file test1.txt. It is not HDFS.
//File content is just one word "Hello"
val rdd2 = sc.textFile("C:/test1.txt")
println(rdd2.getNumPartitions) // ==> Result is 2
Run Code Online (Sandbox Code Playgroud)

根据Apache Spark 文档,spark.default.parallelism我的笔记本电脑(2核心处理器)中的核心数量.

我的问题是:rdd2似乎正在给出2个分区的正确结果,如文档中所述.但为什么rdd1将结果作为4个分区?

eli*_*sah 26

最小分区数实际上是由下设置的下限SparkContext.由于spark在引擎盖下使用hadoop,Hadoop InputFormat`仍然是默认行为.

第一种情况应该反映defaultParallelism这里提到的可能会有所不同,具体取决于设置和硬件.(核心数量等)

因此,除非您提供切片数量,否则第一种情况将由描述的数字定义sc.defaultParallelism:

scala> sc.defaultParallelism
res0: Int = 6

scala> sc.parallelize(1 to 100).partitions.size
res1: Int = 6
Run Code Online (Sandbox Code Playgroud)

对于第二种情况,使用sc.textFile,默认情况下切片的数量是最小分区数.

这是等于2,你可以看到在这部分代码.

因此,您应该考虑以下事项:

  • sc.parallelize将采取numSlicesdefaultParallelism.

  • sc.textFile将取最大值minPartitions和基于hadoop输入分割大小计算的分割数除以块大小.

    • sc.textFile调用sc.hadoopFile,创建一个在引擎盖下HadoopRDD使用InputFormat.getSplits[引用.输入格式文档 ].

    • InputSplit[] getSplits(JobConf job, int numSplits) throws IOException:逻辑分割作业的输入文件集.然后将每个InputSplit分配给单个Mapper进行处理. 注意:拆分是输入的逻辑拆分,输入文件不会物理拆分为块.例如,拆分可能是元组.参数:作业 - 作业配置.numSplits - 所需的分割数,一个提示.返回:作业的InputSplits数组.抛出:IOException.

例:

让我们创建一些虚拟文本文件:

fallocate -l 241m bigfile.txt
fallocate -l 4G hugefile.txt
Run Code Online (Sandbox Code Playgroud)

这将创建2个文件,分别大小为241MB和4GB.

我们可以看到当我们阅读每个文件时会发生什么:

scala> val rdd = sc.textFile("bigfile.txt")
// rdd: org.apache.spark.rdd.RDD[String] = bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27

scala> rdd.getNumPartitions
// res0: Int = 8

scala> val rdd2 = sc.textFile("hugefile.txt")
// rdd2: org.apache.spark.rdd.RDD[String] = hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27

scala> rdd2.getNumPartitions
// res1: Int = 128
Run Code Online (Sandbox Code Playgroud)

它们都是HadoopRDDs:

scala> rdd.toDebugString
// res2: String = 
// (8) bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27 []
//  |  bigfile.txt HadoopRDD[0] at textFile at <console>:27 []

scala> rdd2.toDebugString
// res3: String = 
// (128) hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27 []
//   |   hugefile.txt HadoopRDD[2] at textFile at <console>:27 []
Run Code Online (Sandbox Code Playgroud)

  • 我还不清楚.对于4 GB的文件和128 MB的块大小.应该给出4096/128 = 32个分区,不是吗?为什么它给了128个分区.另外为什么8个分区为241 MB文件?它应该不是2个分区吗? (6认同)
  • @Abhash Upadhyaya 分区值基于输入拆分大小...输入拆分为 32MB,因此 255/32~8 和 531/32~17 (2认同)