为什么SparkContext.textFile的分区参数不生效?

Ara*_*ram 5 scala apache-spark rdd

scala> val p=sc.textFile("file:///c:/_home/so-posts.xml", 8) //i've 8 cores
p: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile at <console>:21

scala> p.partitions.size
res33: Int = 729
Run Code Online (Sandbox Code Playgroud)

我希望打印8个,我在Spark UI中看到729个任务

编辑:

打完电话后repartition(),通过@ zero323建议

scala> p1 = p.repartition(8)
scala> p1.partitions.size
res60: Int = 8
scala> p1.count
Run Code Online (Sandbox Code Playgroud)

我仍然在Spark UI中看到729个任务,即使spark-shell打印8个.

zer*_*323 8

如果你看一下签名

textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] 
Run Code Online (Sandbox Code Playgroud)

你会看到你使用的参数被调用minPartitions,这几乎描述了它的功能.在某些情况下,即使这被忽略,但这是另一回事.在幕后使用的输入格式仍决定如何计算分割.

在这种特殊情况下,您可能会使用mapred.min.split.size增加分割大小(这将在加载期间工作)或仅repartition在加载后(这将在加载数据后生效),但通常不需要这样做.

  • `repartition`将在加载数据后发生.它不会修改`textFile`的行为. (3认同)

Jac*_*ski 8

@ zero323钉了它,但我想我会在这个minPartitions输入参数如何影响分区数量上添加更多(低级)背景.

tl; dr partition参数确实SparkContext.textFile最小(不是确切的!)分区数有影响.

在使用这种特定情况下SparkContext.textFile,分区的数目是通过直接计算org.apache.hadoop.mapred.TextInputFormat.getSplits(jobConf,minPartitions)所使用的textFile.TextInputFormat 知道如何使用Spark按照建议对分布式数据进行分区(也称为拆分).

来自Hadoop的FileInputFormat的javadoc:

FileInputFormat是所有基于文件的InputFormats的基类.这提供了getSplits(JobConf,int)的通用实现.FileInputFormat的子类还可以覆盖isSplitable(FileSystem,Path)方法,以确保输入文件不会被拆分并由Mappers作为整体进行处理.

这是Spark利用Hadoop API的一个很好的例子.

顺便说一下,你可能会发现来源很有启发性;-)