当我将distinct()spark 数据框替换为groupBy(). 但我无法理解其背后的原因。整个目的是从数据框中删除行级重复项。
我试着用搜索引擎的实现groupBy(),并distinct()在pyspark,但未能找到它。
有人可以解释或指出我正确的解释方向吗?
我最近重点关注Apache Spark SQL 中的GROUP BY和操作之间的差异。DISTINCT碰巧...两者有时可能是相同的!
要查看这一点,请运行以下代码并检查执行计划:
(0 to 10).map(id => (s"id#${id}", s"login${id % 25}"))
.toDF("id", "login").createTempView("users")
sparkSession.sql("SELECT login FROM users GROUP BY login").explain(true)
sparkSession.sql("SELECT DISTINCT(login) FROM users").explain(true)
Run Code Online (Sandbox Code Playgroud)
惊喜,惊喜!计划应该是这样的:
== Physical Plan ==
*(2) HashAggregate(keys=[login#8], functions=[], output=[login#8])
+- Exchange hashpartitioning(login#8, 200), ENSURE_REQUIREMENTS, [id=#33]
+- *(1) HashAggregate(keys=[login#8], functions=[], output=[login#8])
+- *(1) LocalTableScan [login#8]
Run Code Online (Sandbox Code Playgroud)
为什么?由于ReplaceDistinctWithAggregate规则,您应该在日志中看到正在运行的规则:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ReplaceDistinctWithAggregate ===
!Distinct Aggregate [login#8], [login#8]
+- LocalRelation [login#8] +- LocalRelation [login#8]
(org.apache.spark.sql.catalyst.rules.PlanChangeLogger:65)
Run Code Online (Sandbox Code Playgroud)
============================= 更新:
对于更复杂的查询(例如,使用聚合),这可能是一个差异。
sparkSession.sql("SELECT COUNT(login) FROM users GROUP BY login").explain(true)
sparkSession.sql("SELECT COUNT(DISTINCT(login)) FROM users").explain(true)
Run Code Online (Sandbox Code Playgroud)
该GROUP BY版本生成一个只有一次 shuffle 的计划:
== Physical Plan ==
*(2) HashAggregate(keys=[login#8], functions=[count(login#8)], output=[count(login)#12L])
+- Exchange hashpartitioning(login#8, 200), ENSURE_REQUIREMENTS, [id=#16]
+- *(1) HashAggregate(keys=[login#8], functions=[partial_count(login#8)], output=[login#8, count#15L])
+- *(1) LocalTableScan [login#8]
Run Code Online (Sandbox Code Playgroud)
而 with 的版本DISTINCT会生成 2 次随机播放。第一个用于删除重复登录,第二个用于对登录进行计数:
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(distinct login#8)], output=[count(DISTINCT login)#17L])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#48]
+- *(2) HashAggregate(keys=[], functions=[partial_count(distinct login#8)], output=[count#21L])
+- *(2) HashAggregate(keys=[login#8], functions=[], output=[login#8])
+- Exchange hashpartitioning(login#8, 200), ENSURE_REQUIREMENTS, [id=#43]
+- *(1) HashAggregate(keys=[login#8], functions=[], output=[login#8])
+- *(1) LocalTableScan [login#8]
Run Code Online (Sandbox Code Playgroud)
然而,从语义上讲,这些查询并不相同,因为第一个查询生成登录组,而第二个查询也对它们进行计数。它解释了额外的洗牌步骤。
使用更改之前/之后的代码可以更轻松地回答问题。@pri,你有它以便我们可以分析 PySpark 执行的计划吗?
| 归档时间: |
|
| 查看次数: |
4201 次 |
| 最近记录: |