mag*_*nus 3 scala apache-spark
我正在编写一个预处理应用程序,除其他转换和操作外,该应用程序最后将数据集排序,然后再将其写入HDFS。一个新请求要求我对数据集进行重复数据删除,因此我希望在排序的一个阶段中执行此操作。我的理解是,为了有效地进行重复数据删除,排序是必要的(也许我在这方面做错了,没有做太多研究,只是看起来很自然)。
由于某些原因(MapType输出模式中的列),我首先distinct在比进行了较早的阶段进行了测试sort,以为MapType稍后将它们合并以将它们合并在一起。
发生的事情是,跳过了排序的第二阶段,就好像数据集已经被排序一样。这对我来说很有意义,但文档(AFAIK)的任何地方均不支持,而且我也不知道预期的行为是否稳定(我不想将其推向生产阶段只是为了意识到自己突然之间进行2个昂贵的阶段:sort和distinct两个)。是否有人对如何实施sort和/或distinct实施有更多见解?
在Spark中,distinct通常所有聚合操作(例如groupBy)都不会对数据进行排序。我们可以使用该explain功能轻松地进行检查。
// Let's generate a df with 5 elements in [0, 4[ to have at least one duplicate
val data = spark.range(5).select(floor(rand() * 4) as "r")
data.distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#105L], functions=[])
+- Exchange hashpartitioning(r#105L, 200)
+- *HashAggregate(keys=[r#105L], functions=[])
+- *Project [FLOOR((rand(7842501052366484791) * 5.0)) AS r#105L]
+- *Range (0, 10, step=1, splits=2)
Run Code Online (Sandbox Code Playgroud)
HashAggregate+ Exchange表示对元素进行散列和混洗,以使具有相同散列的元素位于同一分区中。然后,将具有相同散列的元素进行比较并取消重复。因此,该数据不会在处理后进行排序。让我们检查一下:
data.distinct.show()
+---+
| r|
+---+
| 0|
| 3|
| 2|
+---+
Run Code Online (Sandbox Code Playgroud)
让我们现在解决您对性能的关注。如果您在重复数据删除之后进行排序,则会发生这种情况。
data.distinct.orderBy("r").explain
== Physical Plan ==
*Sort [r#227L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
+- *HashAggregate(keys=[r#227L], functions=[])
+- Exchange hashpartitioning(r#227L, 200)
+- *HashAggregate(keys=[r#227L], functions=[])
+- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
+- *Range (0, 5, step=1, splits=2)
Run Code Online (Sandbox Code Playgroud)
我们可以看到,数据经过Exchange hashpartitioning重新整理以进行重复数据删除(),并再次经过重新整理以进行排序(Exchange rangepartitioning)。那很贵。这是由于以下事实:排序需要按范围重新排序,以便相同范围内的元素最终位于同一分区中,然后可以对其进行排序。但是,在进行重复数据删除之前,我们可以变得更聪明,更聪明:
data.orderBy("r").distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#227L], functions=[])
+- *HashAggregate(keys=[r#227L], functions=[])
+- *Sort [r#227L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
+- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
+- *Range (0, 5, step=1, splits=2)
Run Code Online (Sandbox Code Playgroud)
仅剩一个交换。实际上,spark知道按范围重新排列后,重复的元素就位于同一分区中。因此,它不会触发新的随机播放。
| 归档时间: |
|
| 查看次数: |
91 次 |
| 最近记录: |