为什么dataset.count导致shuffle!(火花2.2)

big*_*ann 12 scala apache-spark rdd spark-dataframe

这是我的数据帧:

在此输入图像描述

底层RDD有2个分区

在此输入图像描述 在此输入图像描述

当我做df.count时,产生的DAG是 在此输入图像描述

当我执行df.rdd.count时,生成的DAG是:

在此输入图像描述

:Count是spark中的一个动作,官方定义是'返回DataFrame中的行数'.现在,当我对数据帧执行计数时,为什么会发生洗牌?此外,当我在底层RDD上做同样的事情时,不会发生随机播放.

对我来说无论如何都会发生洗牌是没有意义的.我试图通过这里的计数源代码来解决spark github 但它对我来说没有任何意义."groupby"是否被提供给行动的罪​​魁祸首?

PS.df.coalesce(1).count不会导致任何混乱

Pra*_*rma 6

似乎 DataFrame 的计数操作使用 groupBy 导致 shuffle。下面是来自https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala的代码

* Returns the number of rows in the Dataset.
* @group action
* @since 1.6.0
*/
def count(): Long = withAction("count", groupBy().count().queryExecution) { 
plan =>
plan.executeCollect().head.getLong(0)
}
Run Code Online (Sandbox Code Playgroud)

而如果您查看 RDD 的 count 函数,它将聚合函数传递给每个分区,它将每个分区的总和作为 Array 返回,然后使用 .sum 对数组的元素求和。

此链接的代码片段:https : //github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala

/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
Run Code Online (Sandbox Code Playgroud)


nef*_*o_x 5

当 Spark 进行数据帧操作时,它首先计算每个分区的部分计数,然后使用另一个阶段将这些计数加在一起。这对于大型数据帧特别有用,其中将计数分配给多个执行器实际上会提高性能。

验证这一点的地方是Spark UI 的SQL选项卡,其中包含以下某种物理计划描述:

*HashAggregate(keys=[], functions=[count(1)], output=[count#202L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#206L])
Run Code Online (Sandbox Code Playgroud)

  • 我尚未以某种方式验证它(也许访问代码),但我相信 rdd.count 的工作方式是计算各个分区计数并将其发送到驱动程序以进行最终总和 - 所有这一切都发生在一个阶段。 (4认同)