use*_*082 5 python pyspark pyspark-sql
在 pyspark 中,我有一个如下所示的数据框,其中根据 id 和 k1 的值对行进行排序。此外,每一行都有一个唯一的升序编号(rowid)。
-----------------------
rowid | id | k1 | k2 |
-----------------------
1 | 1 | v1 | l1 |
2 | 1 | v1 | v1 |
3 | 1 | v1 | l2 |
4 | 2 | v2 | v2 |
5 | 2 | v2 | l3 |
6 | 3 | v3 | l3 |
----------------------
Run Code Online (Sandbox Code Playgroud)
对于id的每个唯一值,我想计算k1==k2的第一行的rowid与观察到该id的记录的第一行对应的rowid的差+1,并存储结果在新列中(即排名)。输出应如下所示。
----------------
id | k1 |rank |
-----------------
1 | v1 | 2 |
2 | v2 | 1 |
3 | v3 | 0 |
-----------------
Run Code Online (Sandbox Code Playgroud)
例如,对于 id = 1,当 rowid=2 时 k1==k2 的值。第一次观察到 id=1 是在 rowid=1 时。将 2-1+1=2 放在排名列中。对于 id =3,我们没有任何记录与列 k1 和 k2 的值匹配。因此,用 0(或 null)填充等级列。
我假设这涉及基于 id 的 groupBy,但我不确定如何获取与 k1 和 k2 列匹配的行以及与每个唯一 id 对应的第一个 rowid 对应的索引。
groupBy您可以使用带有onid和 的API 函数来执行此操作k1,这应该比使用 a 更快udf:
import pyspark.sql.functions as f
df.groupBy("id", "k1")\
.agg(
f.min(f.when(f.col("k1")==f.col("k2"), f.col("rowid"))).alias("first_equal"),
f.min("rowid").alias("first_row")
)\
.select("id", "k1", (f.col("first_equal")-f.col("first_row")+1).alias("rank"))\
.fillna(0)\
.show()
#+---+---+----+
#| id| k1|rank|
#+---+---+----+
#| 1| v1| 2|
#| 2| v2| 1|
#| 3| v3| 0|
#+---+---+----+
Run Code Online (Sandbox Code Playgroud)
的计算rank可以分为两个聚合步骤:
rowid的最小值。k1==k2idk1rowid每个id,k1对的最小值。您取这些值的差异(+1根据您的要求),最后用 填充任何null值0。
更新:另一种方法使用row_number:
from pyspark.sql import Window
# you can define your own order by column
w = Window.partitionBy("id", "k1").orderBy("rowid")
df.withColumn("rank", f.when(f.expr("k1 = k2"), f.row_number().over(w)))\
.groupBy("id", "k1")\
.agg(f.min("rank"))\
.fillna(0)\
.show()
# Same as above
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
594 次 |
| 最近记录: |