Pyspark --- 添加具有每组值的新列

use*_*546 2 group-by dataframe apache-spark pyspark

假设我有以下数据集:

a | b   
1 | 0.4 
1 | 0.8 
1 | 0.5 
2 | 0.4
2 | 0.1
Run Code Online (Sandbox Code Playgroud)

我想添加一个名为其中的值是每个组中值的本地确定的“标签”新列一个。组ab的最高值标记为 1,所有其他值标记为 0。

输出将如下所示:

a | b   | label
1 | 0.4 | 0
1 | 0.8 | 1
1 | 0.5 | 0
2 | 0.4 | 1
2 | 0.1 | 0
Run Code Online (Sandbox Code Playgroud)

如何使用 PySpark 有效地做到这一点?

zer*_*323 6

你可以用窗口函数来做到这一点。首先你需要几个进口:

from pyspark.sql.functions import desc, row_number, when
from pyspark.sql.window import Window
Run Code Online (Sandbox Code Playgroud)

和窗口定义:

w = Window().partitionBy("a").orderBy(desc("b"))
Run Code Online (Sandbox Code Playgroud)

最后你使用这些:

df.withColumn("label", when(row_number().over(w) == 1, 1).otherwise(0))
Run Code Online (Sandbox Code Playgroud)

例如数据:

df = sc.parallelize([
    (1, 0.4), (1, 0.8), (1, 0.5), (2, 0.4), (2, 0.1)
]).toDF(["a", "b"])
Run Code Online (Sandbox Code Playgroud)

结果是:

+---+---+-----+
|  a|  b|label|
+---+---+-----+
|  1|0.8|    1|
|  1|0.5|    0|
|  1|0.4|    0|
|  2|0.4|    1|
|  2|0.1|    0|
+---+---+-----+
Run Code Online (Sandbox Code Playgroud)