Sri*_*Sri 12 scala apache-spark
版本:Spark 1.6.2,Scala 2.10
我正在执行下面的命令spark-shell.我试图查看Spark默认创建的分区数.
val rdd1 = sc.parallelize(1 to 10)
println(rdd1.getNumPartitions) // ==> Result is 4
//Creating rdd for the local file test1.txt. It is not HDFS.
//File content is just one word "Hello"
val rdd2 = sc.textFile("C:/test1.txt")
println(rdd2.getNumPartitions) // ==> Result is 2
Run Code Online (Sandbox Code Playgroud)
根据Apache Spark 文档,spark.default.parallelism我的笔记本电脑(2核心处理器)中的核心数量.
我的问题是:rdd2似乎正在给出2个分区的正确结果,如文档中所述.但为什么rdd1将结果作为4个分区?
eli*_*sah 26
最小分区数实际上是由下设置的下限SparkContext.由于spark在引擎盖下使用hadoop,Hadoop InputFormat`仍然是默认行为.
第一种情况应该反映defaultParallelism在这里提到的可能会有所不同,具体取决于设置和硬件.(核心数量等)
因此,除非您提供切片数量,否则第一种情况将由描述的数字定义sc.defaultParallelism:
scala> sc.defaultParallelism
res0: Int = 6
scala> sc.parallelize(1 to 100).partitions.size
res1: Int = 6
Run Code Online (Sandbox Code Playgroud)
对于第二种情况,使用sc.textFile,默认情况下切片的数量是最小分区数.
这是等于2,你可以看到在这部分代码.
因此,您应该考虑以下事项:
sc.parallelize将采取numSlices或defaultParallelism.
sc.textFile将取最大值minPartitions和基于hadoop输入分割大小计算的分割数除以块大小.
sc.textFile调用sc.hadoopFile,创建一个在引擎盖下HadoopRDD使用InputFormat.getSplits[引用.输入格式文档 ].
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException:逻辑分割作业的输入文件集.然后将每个InputSplit分配给单个Mapper进行处理. 注意:拆分是输入的逻辑拆分,输入文件不会物理拆分为块.例如,拆分可能是元组.参数:作业 - 作业配置.numSplits - 所需的分割数,一个提示.返回:作业的InputSplits数组.抛出:IOException.
例:
让我们创建一些虚拟文本文件:
fallocate -l 241m bigfile.txt
fallocate -l 4G hugefile.txt
Run Code Online (Sandbox Code Playgroud)
这将创建2个文件,分别大小为241MB和4GB.
我们可以看到当我们阅读每个文件时会发生什么:
scala> val rdd = sc.textFile("bigfile.txt")
// rdd: org.apache.spark.rdd.RDD[String] = bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27
scala> rdd.getNumPartitions
// res0: Int = 8
scala> val rdd2 = sc.textFile("hugefile.txt")
// rdd2: org.apache.spark.rdd.RDD[String] = hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27
scala> rdd2.getNumPartitions
// res1: Int = 128
Run Code Online (Sandbox Code Playgroud)
它们都是HadoopRDDs:
scala> rdd.toDebugString
// res2: String =
// (8) bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27 []
// | bigfile.txt HadoopRDD[0] at textFile at <console>:27 []
scala> rdd2.toDebugString
// res3: String =
// (128) hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27 []
// | hugefile.txt HadoopRDD[2] at textFile at <console>:27 []
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
17237 次 |
| 最近记录: |