删除关闭时间戳的条目

gim*_*770 2 scala apache-spark spark-dataframe

我想删除所有重复条目的记录但是说时间戳的差异可以是任何时间量的偏移量,但为简单起见将使用2分钟.

+-------------------+-----+----+
|Date               |ColA |ColB|
+-------------------+-----+----+
|2017-07-04 18:50:21|ABC  |DEF |
|2017-07-04 18:50:26|ABC  |DEF |
|2017-07-04 18:50:21|ABC  |KLM |
+-------------------+-----+----+
Run Code Online (Sandbox Code Playgroud)

我希望我的数据帧只有行

+-------------------+-----+----+
|Date               |ColA |ColB|
+-------------------+-----+----+
|2017-07-04 18:50:26|ABC  |DEF |
|2017-07-04 18:50:21|ABC  |KLM |
+-------------------+-----+----+
Run Code Online (Sandbox Code Playgroud)

我试过这样的东西,但这并没有删除重复.

    val joinedDfNoDuplicates = joinedDFTransformed.as("df1").join(joinedDFTransformed.as("df2"), col("df1.ColA") === col("df2.ColA") &&
      col("df1.ColB") === col("df2.ColB") && 
      && abs(unix_timestamp(col("Date")) - unix_timestamp(col("Date"))) > offset
      )
Run Code Online (Sandbox Code Playgroud)

现在,我只是在这里选择distinct或group by 通过Spark group查找时间戳的最小值 基于某些列的数据上的数据但是我想要一个更健壮的解决方案,原因是该间隔之外的数据可能是有效的数据.此外,根据要求,可以在5s或5分钟内改变偏移.

有人向我提到有关创建UDF比较日期以及所有其他列是否相同但我不确定如何做到这一点,我要么过滤掉行或添加一个标志,然后删除那些行任何帮助都会很大赞赏.

这里有类似的SQL问题带有不同时间戳的重复条目

谢谢!

Ram*_*ami 5

我会这样做:

  1. 定义一个窗口以命令虚拟列上的日期.
  2. 添加一个虚拟列,并为其添加一个常量值.
  3. 添加包含上一条记录日期的新列.
  4. 计算日期和上一个日期之间的差异.
  5. 根据差异的值过滤您的记录.

守则可以是以下内容:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("dummy").orderBy("Date") // step 1

df.withColumn("dummy", lit(1)) // this is step 1
  .withColumn("previousDate", lag($"Date", 1) over w) // step 2
  .withColumn("difference", unix_timestamp($"Date") - unix_timestamp("previousDate")) // step 3
Run Code Online (Sandbox Code Playgroud)

如果您有可能在时间上接近的记录对,则上述解决方案有效.如果您有两个以上的记录,您可以将每个记录与窗口中的第一个记录(不是前一个记录)进行比较,因此lag($"Date",1)您可以使用而不是使用first($"Date").在这种情况下,"差异"列包含当前记录与窗口中第一条记录之间的时间差.