Qma*_*age 8 apache-spark apache-spark-sql pyspark
当使用dropDuplicatesSpark DF中的功能时,将保留哪一行?Spark文档中没有说明.
PS假设在分布式YARN环境中(不是本地主站)
Jac*_*ski 11
TL; DR保持第一(根据行顺序)
dropDuplicatesSpark SQL中的运算符使用Deduplicate运算符创建逻辑计划.
该Deduplicate操作符由Spark SQL的Catalyst Optimizer 转换为First逻辑运算符,可以很好地回答您的问题(!)
您可以Deduplicate在下面的逻辑计划中查看运算符.
// create datasets with duplicates
val dups = spark.range(9).map(_ % 3)
val q = dups.dropDuplicates
Run Code Online (Sandbox Code Playgroud)
以下是q数据集的逻辑计划.
scala> println(q.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#64L], false
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)
Deduplicate然后将运算符转换为First逻辑运算符(Aggregate在优化之后将其自身显示为运算符).
scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [value#64L], [value#64L]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)
花一些时间审查阿帕奇星火的代码后,dropDuplicates运营商就相当于groupBy后面第一个功能.
first(columnName:String,ignoreNulls:Boolean):Column Aggregate函数:返回组中列的第一个值.
import org.apache.spark.sql.functions.first
val firsts = dups.groupBy("value").agg(first("value") as "value")
scala> println(firsts.queryExecution.logical.numberedTreeString)
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
scala> firsts.explain
== Physical Plan ==
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])
+- Exchange hashpartitioning(value#64L, 200)
+- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])
+- *SerializeFromObject [input[0, bigint, false] AS value#64L]
+- *MapElements <function1>, obj#63: bigint
+- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
+- *Range (0, 9, step=1, splits=8)
Run Code Online (Sandbox Code Playgroud)
我也认为是dropDuplicates运营商可能是更好的性能.