Paw*_*nko 5 scala distributed-computing bigdata apache-spark
在将RDD写入文件之前执行以下转换之间有什么区别?
代码示例:
val input = sc.textFile(inputFile)
val filtered = input.filter(doSomeFiltering)
val mapped = filtered.map(doSomeMapping)
mapped.coalesce(1, shuffle = true).saveAsTextFile(outputFile)
vs
mapped.coalesce(1, shuffle = false).saveAsTextFile(outputFile)
Run Code Online (Sandbox Code Playgroud)
它与collect()相比如何?我完全知道Spark保存方法会将它存储为HDFS风格的结构,但是我对collect()和shuffled/non-shuffled coalesce()的数据分区方面更感兴趣.
shuffle = true和shuffle = false在结果输出中不会有任何实际差异,因为它们都会下降到单个分区.但是,当你将它设置为true时,你将进行一次无用的随机播放.使用shuffle = true,输出均匀分布在分区之间(如果需要,还可以增加分区数),但由于目标是1分区,所以一切都在一个分区中结束.
至于与collect()的比较,区别在于所有数据都存储在单个执行器而不是驱动程序上.
coalesce(n, shuffle = true)这也相当于可能对您的作业repartition(n)执行方式产生相当大的影响,具体取决于您在父 RDD 中拥有的映射或任何其他处理登录名。
一般来说,当父分区中的数据分布均匀并且您没有大幅减少分区数量时,在使用.shuffle 时应避免使用shufflecoalesce。
但是,在您的情况下,这会大大减少分区数量,并且根据文档
然而,如果您正在进行剧烈合并,例如 numPartitions = 1,这可能会导致您的计算发生在比您希望的更少的节点上(例如,在 numPartitions = 1 的情况下为一个节点)。为了避免这种情况,您可以传递 shuffle = true。这将添加一个洗牌步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)
鉴于此,现在您需要正确评估并做出选择
例如,考虑以下片段,这些片段与您可能拥有的实际逻辑相去甚远,但可以让您了解正在发生的事情
// fast
sc.parallelize(0 to 1000000, 10)
.mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
.coalesce(1, shuffle = true)
.toDF.write.text("shuffleTrue")
Run Code Online (Sandbox Code Playgroud)
// slow
sc.parallelize(0 to 1000000, 10)
.mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
.coalesce(1, shuffle = false)
.toDF.write.text("shuffleFalse")
Run Code Online (Sandbox Code Playgroud)
在我的集群上,10 个任务的shuffle = true总时间约为5 秒,在每个父分区上并行执行计算逻辑。另一个则需要大约50 秒在一个执行器上的单个任务中完成所有计算。shuffle = false
| 归档时间: |
|
| 查看次数: |
5401 次 |
| 最近记录: |