dropDuplicates运算符中使用了哪一行?

Qma*_*age 8 apache-spark apache-spark-sql pyspark

当使用dropDuplicatesSpark DF中的功能时,将保留哪一行?Spark文档中没有说明.

  1. 保持第一(根据行顺序)
  2. 保持最后(根据行顺序)
  3. 随机?

PS假设在分布式YARN环境中(不是本地主站)

Jac*_*ski 11

TL; DR保持第一(根据行顺序)

dropDuplicatesSpark SQL中的运算符使用Deduplicate运算符创建逻辑计划.

Deduplicate操作由Spark SQL的Catalyst Optimizer 转换为First逻辑运算符,可以很好地回答您的问题(!)

您可以Deduplicate在下面的逻辑计划中查看运算符.

// create datasets with duplicates
val dups = spark.range(9).map(_ % 3)

val q = dups.dropDuplicates
Run Code Online (Sandbox Code Playgroud)

以下是q数据集的逻辑计划.

scala> println(q.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#64L], false
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)

Deduplicate然后将运算符转换为First逻辑运算符(Aggregate在优化之后将其自身显示为运算符).

scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [value#64L], [value#64L]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)

花一些时间审查阿帕奇星火的代码后,dropDuplicates运营商就相当于groupBy后面第一个功能.

first(columnName:String,ignoreNulls:Boolean):Column Aggregate函数:返回组中列的第一个值.

import org.apache.spark.sql.functions.first
val firsts = dups.groupBy("value").agg(first("value") as "value")
scala> println(firsts.queryExecution.logical.numberedTreeString)
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))

scala> firsts.explain
== Physical Plan ==
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])
+- Exchange hashpartitioning(value#64L, 200)
   +- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])
      +- *SerializeFromObject [input[0, bigint, false] AS value#64L]
         +- *MapElements <function1>, obj#63: bigint
            +- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
               +- *Range (0, 9, step=1, splits=8)
Run Code Online (Sandbox Code Playgroud)

我也认为dropDuplicates运营商可能是更好的性能.

  • Spark groupBy()和first()聚合不保留排序.如果这是drop_duplicates的实现,则不应指望保留任何排序. (2认同)