Mar*_*kus 2 scala apache-spark apache-spark-sql
我有以下数据帧df
:
如何删除重复项,同时保持level
每个重复的item_id
和的最小值country_id
。
+-----------+----------+---------------+
|item_id |country_id|level |
+-----------+----------+---------------+
| 312330| 13535670| 82|
| 312330| 13535670| 369|
| 312330| 13535670| 376|
| 319840| 69731210| 127|
| 319840| 69730600| 526|
| 311480| 69628930| 150|
| 311480| 69628930| 138|
| 311480| 69628930| 405|
+-----------+----------+---------------+
Run Code Online (Sandbox Code Playgroud)
预期输出:
+-----------+----------+---------------+
|item_id |country_id|level |
+-----------+----------+---------------+
| 312330| 13535670| 82|
| 319840| 69731210| 127|
| 319840| 69730600| 526|
| 311480| 69628930| 138|
+-----------+----------+---------------+
Run Code Online (Sandbox Code Playgroud)
我知道如何使用 无条件删除重复项dropDuplicates
,但我不知道如何针对我的特定情况执行此操作。
一种方法是使用orderBy
(默认是升序),groupBy
和聚合first
import org.apache.spark.sql.functions.first
df.orderBy("level").groupBy("item_id", "country_id").agg(first("level").as("level")).show(false)
Run Code Online (Sandbox Code Playgroud)
您也可以通过使用.asc
升序和.desc
降序来定义顺序,如下所示
df.orderBy($"level".asc).groupBy("item_id", "country_id").agg(first("level").as("level")).show(false)
Run Code Online (Sandbox Code Playgroud)
您也可以使用window
和row_number
功能进行操作,如下所示
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("item_id", "country_id").orderBy($"level".asc)
import org.apache.spark.sql.functions.row_number
df.withColumn("rank", row_number().over(windowSpec)).filter($"rank" === 1).drop("rank").show()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
5185 次 |
最近记录: |