Pet*_*bčo 4 scala apache-spark apache-spark-sql
我知道有几个转换可以保留父分区(如果之前设置过 - 例如mapValues),而有些转换不保留它(例如map)。
我使用 Spark 2.2 的数据集 API。我的问题是 -dropDuplicates转换是否保留分区?想象一下这段代码:
case class Item(one: Int, two: Int, three: Int)
import session.implicits._
val ds = session.createDataset(List(Item(1,2,3), Item(1,2,3)))
val repart = ds.repartition('one, 'two).cache()
repart.dropDuplicates(List("one", "two")) // will be partitioning preserved?
Run Code Online (Sandbox Code Playgroud)
通常,dropDuplicates会进行洗牌(因此不会保留分区),但在您的特殊情况下,它不会进行额外的洗牌,因为您已经以优化器考虑的合适形式对数据集进行了分区:
repart.dropDuplicates(List("one","two")).explain()
== Physical Plan ==
*HashAggregate(keys=[one#3, two#4, three#5], functions=[])
+- *HashAggregate(keys=[one#3, two#4, three#5], functions=[])
+- InMemoryTableScan [one#3, two#4, three#5]
+- InMemoryRelation [one#3, two#4, three#5], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- Exchange hashpartitioning(one#3, two#4, 200)
+- LocalTableScan [one#3, two#4, three#5]
Run Code Online (Sandbox Code Playgroud)
在这里寻找的关键字是: Exchange
但请考虑以下代码,您首先使用 plain 对数据集进行重新分区repartition():
val repart = ds.repartition(200).cache()
repart.dropDuplicates(List("one","two")).explain()
Run Code Online (Sandbox Code Playgroud)
这确实会触发额外的 shuffle(现在你有 2 个Exchange步骤):
== Physical Plan ==
*HashAggregate(keys=[one#3, two#4], functions=[first(three#5, false)])
+- Exchange hashpartitioning(one#3, two#4, 200)
+- *HashAggregate(keys=[one#3, two#4], functions=[partial_first(three#5, false)])
+- InMemoryTableScan [one#3, two#4, three#5]
+- InMemoryRelation [one#3, two#4, three#5], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- Exchange RoundRobinPartitioning(200)
+- LocalTableScan [one#3, two#4, three#5]
Run Code Online (Sandbox Code Playgroud)
注意:我在 Spark 2.1 中检查过,它在 Spark 2.2 中可能有所不同,因为优化器在 Spark 2.2(基于成本的优化器)中发生了变化