基于列单调增加ID

vik*_*kky 1 scala apache-spark apache-spark-sql

我正在尝试将新列添加到我的spark DF。我了解可以使用以下代码:

df.withColumn("row",monotonically_increasing_id)
Run Code Online (Sandbox Code Playgroud)

但是我的用例是:

输入DF:

col value
  1
  2
  2
  3
  3
  3
Run Code Online (Sandbox Code Playgroud)

输出DF:

col_value      identifier
  1               1
  2               1
  2               2
  3               1
  3               2
  3               3
Run Code Online (Sandbox Code Playgroud)

关于使用monotonically_increasing或rowWithUniqueIndex进行获取的任何建议。

Leo*_*o C 5

根据您的要求,一种方法是使用row_numberWindow函数:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  1, 2, 2, 3, 3, 3
).toDF("col_value")

val window = Window.partitionBy("col_value").orderBy("col_value")
df.withColumn("identifier", row_number().over(window)).
  orderBy("col_value").
  show
// +---------+----------+
// |col_value|identifier|
// +---------+----------+
// |        1|         1|
// |        2|         1|
// |        2|         2|
// |        3|         1|
// |        3|         2|
// |        3|         3|
// +---------+----------+
Run Code Online (Sandbox Code Playgroud)