vik*_*kky 1 scala apache-spark apache-spark-sql
我正在尝试将新列添加到我的spark DF。我了解可以使用以下代码:
df.withColumn("row",monotonically_increasing_id)
Run Code Online (Sandbox Code Playgroud)
但是我的用例是:
输入DF:
col value
1
2
2
3
3
3
Run Code Online (Sandbox Code Playgroud)
输出DF:
col_value identifier
1 1
2 1
2 2
3 1
3 2
3 3
Run Code Online (Sandbox Code Playgroud)
关于使用monotonically_increasing或rowWithUniqueIndex进行获取的任何建议。
根据您的要求,一种方法是使用row_numberWindow函数:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
1, 2, 2, 3, 3, 3
).toDF("col_value")
val window = Window.partitionBy("col_value").orderBy("col_value")
df.withColumn("identifier", row_number().over(window)).
orderBy("col_value").
show
// +---------+----------+
// |col_value|identifier|
// +---------+----------+
// | 1| 1|
// | 2| 1|
// | 2| 2|
// | 3| 1|
// | 3| 2|
// | 3| 3|
// +---------+----------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
415 次 |
| 最近记录: |