Ara*_*ram 5 scala apache-spark rdd
scala> val p=sc.textFile("file:///c:/_home/so-posts.xml", 8) //i've 8 cores
p: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile at <console>:21
scala> p.partitions.size
res33: Int = 729
Run Code Online (Sandbox Code Playgroud)
我希望打印8个,我在Spark UI中看到729个任务
编辑:
打完电话后repartition(),通过@ zero323建议
scala> p1 = p.repartition(8)
scala> p1.partitions.size
res60: Int = 8
scala> p1.count
Run Code Online (Sandbox Code Playgroud)
我仍然在Spark UI中看到729个任务,即使spark-shell打印8个.
如果你看一下签名
textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
Run Code Online (Sandbox Code Playgroud)
你会看到你使用的参数被调用minPartitions,这几乎描述了它的功能.在某些情况下,即使这被忽略,但这是另一回事.在幕后使用的输入格式仍决定如何计算分割.
在这种特殊情况下,您可能会使用mapred.min.split.size增加分割大小(这将在加载期间工作)或仅repartition在加载后(这将在加载数据后生效),但通常不需要这样做.
@ zero323钉了它,但我想我会在这个minPartitions输入参数如何影响分区数量上添加更多(低级)背景.
tl; dr partition参数确实SparkContext.textFile对最小(不是确切的!)分区数有影响.
在使用这种特定情况下SparkContext.textFile,分区的数目是通过直接计算org.apache.hadoop.mapred.TextInputFormat.getSplits(jobConf,minPartitions)所使用的textFile.TextInputFormat 只知道如何使用Spark按照建议对分布式数据进行分区(也称为拆分).
来自Hadoop的FileInputFormat的javadoc:
FileInputFormat是所有基于文件的InputFormats的基类.这提供了getSplits(JobConf,int)的通用实现.FileInputFormat的子类还可以覆盖isSplitable(FileSystem,Path)方法,以确保输入文件不会被拆分并由Mappers作为整体进行处理.
这是Spark利用Hadoop API的一个很好的例子.
| 归档时间: |
|
| 查看次数: |
4869 次 |
| 最近记录: |