通过最新时间戳对Spark DataFrame中的行进行重复数据删除

Nik*_*Nik 1 inner-join duplicates apache-spark apache-spark-sql

我有DataFrame以下架构:

root
|- documentId
|- timestamp
|- anotherField
Run Code Online (Sandbox Code Playgroud)

例如,

"d1", "2018-09-20 10:00:00", "blah1"
"d2", "2018-09-20 09:00:00", "blah2"
"d1", "2018-09-20 10:01:00", "blahnew"
Run Code Online (Sandbox Code Playgroud)

请注意,为了理解(和方便起见),我将时间戳显示为字符串。实际上,它是long自纪元以来的毫秒数。

如此处所示,存在重复的行(第1行和第3行),它们具有相同documentId但不同的timestamp(可能还有其他字段)。我想timestamp为每个重复数据删除并仅保留最新行(基于)documentId

一种简单的方法df.groupBy("documentId").agg(max("timestamp), ...)似乎不太可能在这里工作,因为我不知道如何将其他字段保留在与满足条件的字段相对应的行中max("timestamp")

因此,我想出了一种复杂的方法。

// first find the max timestamp corresponding to each documentId
val mostRecent = df
    .select("documentId", "timestamp")
      .groupBy("documentId")
        .agg(max("timestamp"))

// now join with the original df on timestamp to retain
val dedupedDf = df.join(mostRecent, Seq("documentId", "timestamp"), "inner")
Run Code Online (Sandbox Code Playgroud)

此结果dedupedDf应仅具有与每个的最新条目相对应的那些行documentId

尽管此方法可行,但我认为这不是正确(或有效)的方法,因为我使用的join似乎是不必要的。

我该如何做得更好?我正在寻找纯粹的基于“ DataFrame”的解决方案,而不是基于RDD的方法(因为DataBricks的人们在一个研讨会中反复告诉我们要使用DataFrames而不是RDD)。

小智 5

参见下面的代码可以帮助您实现目标,

val df = Seq(
  ("d1", "2018-09-20 10:00:00", "blah1"),
  ("d2", "2018-09-20 09:00:00", "blah2"),
  ("d1", "2018-09-20 10:01:00", "blahnew")
).toDF("documentId","timestamp","anotherField")

import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"documentId").orderBy($"timestamp".desc)
val Resultdf = df.withColumn("rownum", row_number.over(w))
     .where($"rownum" === 1).drop("rownum")

Resultdf.show()
Run Code Online (Sandbox Code Playgroud)

输入:

+----------+-------------------+------------+
|documentId|          timestamp|anotherField|
+----------+-------------------+------------+
|        d1|2018-09-20 10:00:00|       blah1|
|        d2|2018-09-20 09:00:00|       blah2|
|        d1|2018-09-20 10:01:00|     blahnew|
+----------+-------------------+------------+
Run Code Online (Sandbox Code Playgroud)

输出:

+----------+-------------------+------------+
|documentId|          timestamp|anotherField|
+----------+-------------------+------------+
|        d2|2018-09-20 09:00:00|       blah2|
|        d1|2018-09-20 10:01:00|     blahnew|
+----------+-------------------+------------+
Run Code Online (Sandbox Code Playgroud)