Apache Spark中的混乱与非混乱合并

Paw*_*nko 5 scala distributed-computing bigdata apache-spark

在将RDD写入文件之前执行以下转换之间有什么区别?

  1. coalesce(1,shuffle = true)
  2. coalesce(1,shuffle = false)

代码示例:

val input = sc.textFile(inputFile)
val filtered = input.filter(doSomeFiltering)
val mapped = filtered.map(doSomeMapping)

mapped.coalesce(1, shuffle = true).saveAsTextFile(outputFile)
vs
mapped.coalesce(1, shuffle = false).saveAsTextFile(outputFile)
Run Code Online (Sandbox Code Playgroud)

它与collect()相比如何?我完全知道Spark保存方法会将它存储为HDFS风格的结构,但是我对collect()和shuffled/non-shuffled coalesce()的数据分区方面更感兴趣.

Hol*_*den 6

shuffle = true和shuffle = false在结果输出中不会有任何实际差异,因为它们都会下降到单个分区.但是,当你将它设置为true时,你将进行一次无用的随机播放.使用shuffle = true,输出均匀分布在分区之间(如果需要,还可以增加分区数),但由于目标是1分区,所以一切都在一个分区中结束.

至于与collect()的比较,区别在于所有数据都存储在单个执行器而不是驱动程序上.

  • 我不确定这是对的。我已经看到对输出分区数量的限制推高了 RDD 计划结构以强制其他操作也使用较低数量(在本例中为 1)的分区。这通常不是您想要的。 (2认同)

kas*_*sur 5

coalesce(n, shuffle = true)这也相当于可能对您的作业repartition(n)执行方式产生相当大的影响,具体取决于您在父 RDD 中拥有的映射或任何其他处理登录名。

一般来说,当父分区中的数据分布均匀并且您没有大幅减少分区数量时,在使用.shuffle 时应避免使用shufflecoalesce

但是,在您的情况下,这会大大减少分区数量,并且根据文档

然而,如果您正在进行剧烈合并,例如 numPartitions = 1,这可能会导致您的计算发生在比您希望的更少的节点上(例如,在 numPartitions = 1 的情况下为一个节点)。为了避免这种情况,您可以传递 shuffle = true。这将添加一个洗牌步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)

鉴于此,现在您需要正确评估并做出选择

  • 混洗潜在的大量数据,但在父分区中并行计算
  • 将所有分区收集到一个中,无需完全重新整理(当然仍然会有数据移动),但在单个任务中进行计算

例如,考虑以下片段,这些片段与您可能拥有的实际逻辑相去甚远,但可以让您了解正在发生的事情

// fast
sc.parallelize(0 to 1000000, 10)
  .mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
  .coalesce(1, shuffle = true)
  .toDF.write.text("shuffleTrue")
Run Code Online (Sandbox Code Playgroud)
// slow
sc.parallelize(0 to 1000000, 10)
  .mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
  .coalesce(1, shuffle = false)
  .toDF.write.text("shuffleFalse")
Run Code Online (Sandbox Code Playgroud)

在我的集群上,10 个任务的shuffle = true总时间约为5 秒,在每个父分区上并行执行计算逻辑。另一个则需要大约50 秒在一个执行器上的单个任务中完成所有计算shuffle = false

  • 在你的段落中,哪一个是“shuffle = False”? (2认同)