基于其他列pyspark删除重复记录

Use*_*345 0 apache-spark pyspark

我有一个data framepyspark像下面。

df.show()
+---+----+
| id|test|
+---+----+
|  1|   Y|
|  1|   N|
|  2|   Y|
|  3|   N|
+---+----+
Run Code Online (Sandbox Code Playgroud)

我想在有重复记录时删除记录id并且testN

现在当我查询 new_df

new_df.show()
+---+----+
| id|test|
+---+----+
|  1|   Y|
|  2|   Y|
|  3|   N|
+---+----+
Run Code Online (Sandbox Code Playgroud)

我无法弄清楚用例。

我已经完成了 groupbyid计数,但它只给出了id列和count.

我做了如下。

grouped_df = new_df.groupBy("id").count()
Run Code Online (Sandbox Code Playgroud)

我怎样才能达到我想要的结果

编辑

我有一个如下所示的数据框。

+-------------+--------------------+--------------------+
|           sn|              device|           attribute|
+-------------+--------------------+--------------------+
|4MY16A5602E0A|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone|                   Y|
|4VT1735J00337|                  TV|                   N|
|4VT1735J00337|                  TV|                   Y|
|4VT47B52003EE|              Router|                   N|
|4VT47C5N00A10|               Other|                   N|
+-------------+--------------------+--------------------+
Run Code Online (Sandbox Code Playgroud)

当我像下面那样做

new_df = df.groupBy("sn").agg(max("attribute").alias("attribute"))
Run Code Online (Sandbox Code Playgroud)

我收到str has no attribute alias错误

预期结果应如下所示

+-------------+--------------------+--------------------+
|           sn|              device|           attribute|
+-------------+--------------------+--------------------+
|4MY16A5602E0A|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone|                   Y|
|4VT1735J00337|                  TV|                   Y|
|4VT47B52003EE|              Router|                   N|
|4VT47C5N00A10|               Other|                   N|
+-------------+--------------------+--------------------+
Run Code Online (Sandbox Code Playgroud)

hi-*_*zir 5

不是最通用的解决方案,但应该很适合这里:

from pyspark.sql.functions import max

df = spark.createDataFrame(
  [(1, "Y"), (1, "N"), (2, "Y"), (3, "N")], ("id", "test")
)

df.groupBy("id").agg(max("test").alias("test")).show()
# +---+----+         
# | id|test|
# +---+----+
# |  1|   Y|
# |  3|   N|
# |  2|   Y|
# +---+----+
Run Code Online (Sandbox Code Playgroud)

更通用的一个:

from pyspark.sql.functions import col, count, when

test = when(count(when(col("test") == "Y", "Y")) > 0, "Y").otherwise("N")

df.groupBy("id").agg(test.alias("test")).show()
# +---+----+
# | id|test|
# +---+----+
# |  1|   Y|
# |  3|   N|
# |  2|   Y|
# +---+----+
Run Code Online (Sandbox Code Playgroud)

可以推广以适应更多的类和非平凡的排序,例如,如果您有三个类Y, ?N按此顺序评估,您可以:

(when(count(when(col("test") == "Y", True)) > 0, "Y")
     .when(count(when(col("test") == "?", True)) > 0, "?")
     .otherwise("N"))
Run Code Online (Sandbox Code Playgroud)

如果您需要保留其他列,这些方法将不起作用,并且您将需要类似于Find maximum row per group in Spark DataFrame 中所示的内容