spark:如何在保持最高时间戳行的同时对数据帧执行dropDuplicates

arn*_*che 8 dataframe apache-spark pyspark spark-dataframe

我有一个用例,我需要删除数据帧的重复行(在这种情况下,重复意味着它们具有相同的'id'字段),同时保持具有最高'timestamp'(unix timestamp)字段的行.

我找到了drop_duplicate方法(我正在使用pyspark),但是没有人控制将保留哪个项目.

有人可以帮忙吗?Thx提前

Dav*_*vid 9

可能需要手动映射和缩减来提供所需的功能.

def selectRowByTimeStamp(x,y):
    if x.timestamp > y.timestamp:
        return x
    return y

dataMap = data.map(lambda x: (x.id, x))
uniqueData = dataMap.reduceByKey(selectRowByTimeStamp) 
Run Code Online (Sandbox Code Playgroud)

这里我们根据id对所有数据进行分组.然后,当我们减少分组时,我们通过保留具有最高时间戳的记录来实现.当代码完成reduce时,每个id只剩下1条记录.


Dav*_*fin 5

你可以这样做:

val df = Seq(
  (1,12345678,"this is a test"),
  (1,23456789, "another test"),
  (2,2345678,"2nd test"),
  (2,1234567, "2nd another test")
).toDF("id","timestamp","data")

+---+---------+----------------+
| id|timestamp|            data|
+---+---------+----------------+
|  1| 12345678|  this is a test|
|  1| 23456789|    another test|
|  2|  2345678|        2nd test|
|  2|  1234567|2nd another test|
+---+---------+----------------+

df.join(
  df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
  $"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
+---+---------+------------+
| id|timestamp|        data|
+---+---------+------------+
|  1| 23456789|another test|
|  2|  2345678|    2nd test|
+---+---------+------------+
Run Code Online (Sandbox Code Playgroud)

如果您希望可以重复timestamp一次id(请参阅下面的评论),您可以这样做:

df.dropDuplicates(Seq("id", "timestamp")).join(
  df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
  $"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
Run Code Online (Sandbox Code Playgroud)