big*_*ann 12 scala apache-spark rdd spark-dataframe
这是我的数据帧:
底层RDD有2个分区
当我执行df.rdd.count时,生成的DAG是:
问:Count是spark中的一个动作,官方定义是'返回DataFrame中的行数'.现在,当我对数据帧执行计数时,为什么会发生洗牌?此外,当我在底层RDD上做同样的事情时,不会发生随机播放.
对我来说无论如何都会发生洗牌是没有意义的.我试图通过这里的计数源代码来解决spark github 但它对我来说没有任何意义."groupby"是否被提供给行动的罪魁祸首?
PS.df.coalesce(1).count不会导致任何混乱
似乎 DataFrame 的计数操作使用 groupBy 导致 shuffle。下面是来自https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala的代码
* Returns the number of rows in the Dataset.
* @group action
* @since 1.6.0
*/
def count(): Long = withAction("count", groupBy().count().queryExecution) {
plan =>
plan.executeCollect().head.getLong(0)
}
Run Code Online (Sandbox Code Playgroud)
而如果您查看 RDD 的 count 函数,它将聚合函数传递给每个分区,它将每个分区的总和作为 Array 返回,然后使用 .sum 对数组的元素求和。
此链接的代码片段:https : //github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
Run Code Online (Sandbox Code Playgroud)
当 Spark 进行数据帧操作时,它首先计算每个分区的部分计数,然后使用另一个阶段将这些计数加在一起。这对于大型数据帧特别有用,其中将计数分配给多个执行器实际上会提高性能。
验证这一点的地方是Spark UI 的SQL选项卡,其中包含以下某种物理计划描述:
*HashAggregate(keys=[], functions=[count(1)], output=[count#202L])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#206L])
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2548 次 |
| 最近记录: |