使用搜索和条件查找两列值之间的差异

use*_*082 5 python pyspark pyspark-sql

在 pyspark 中,我有一个如下所示的数据框,其中根据 id 和 k1 的值对行进行排序。此外,每一行都有一个唯一的升序编号(rowid)。

-----------------------
rowid | id | k1  | k2 |
-----------------------
1     | 1  | v1 | l1  |
2     | 1  | v1 | v1  |
3     | 1  | v1 | l2  |
4     | 2  | v2 | v2  |
5     | 2  | v2 | l3  |
6     | 3  | v3 | l3  |
----------------------
Run Code Online (Sandbox Code Playgroud)

对于id的每个唯一值,我想计算k1==k2的第一行的rowid与观察到该id的记录的第一行对应的rowid的差+1,并存储结果在新列中(即排名)。输出应如下所示。

----------------
 id | k1  |rank |
-----------------
 1  | v1  | 2   |
 2  | v2  | 1   |
 3  | v3  | 0   | 
-----------------
Run Code Online (Sandbox Code Playgroud)

例如,对于 id = 1,当 rowid=2 时 k1==k2 的值。第一次观察到 id=1 是在 rowid=1 时。将 2-1+1=2 放在排名列中。对于 id =3,我们没有任何记录与列 k1 和 k2 的值匹配。因此,用 0(或 null)填充等级列。

我假设这涉及基于 id 的 groupBy,但我不确定如何获取与 k1 和 k2 列匹配的行以及与每个唯一 id 对应的第一个 rowid 对应的索引。

pau*_*ult 1

groupBy您可以使用带有onid和 的API 函数来执行此操作k1,这应该比使用 a 更快udf

import pyspark.sql.functions as f

df.groupBy("id", "k1")\
    .agg(
        f.min(f.when(f.col("k1")==f.col("k2"), f.col("rowid"))).alias("first_equal"),
        f.min("rowid").alias("first_row")
    )\
    .select("id", "k1", (f.col("first_equal")-f.col("first_row")+1).alias("rank"))\
    .fillna(0)\
    .show()
#+---+---+----+
#| id| k1|rank|
#+---+---+----+
#|  1| v1|   2|
#|  2| v2|   1|
#|  3| v3|   0|
#+---+---+----+
Run Code Online (Sandbox Code Playgroud)

的计算rank可以分为两个聚合步骤:

  • 第一个聚合获取每个,对rowid的最小值。k1==k2idk1
  • 第二个聚合取rowid每个id,k1对的最小值。

您取这些值的差异(+1根据您的要求),最后用 填充任何null0


更新:另一种方法使用row_number

from pyspark.sql import Window

# you can define your own order by column
w = Window.partitionBy("id", "k1").orderBy("rowid")

df.withColumn("rank", f.when(f.expr("k1 = k2"), f.row_number().over(w)))\
    .groupBy("id", "k1")\
    .agg(f.min("rank"))\
    .fillna(0)\
    .show()
# Same as above
Run Code Online (Sandbox Code Playgroud)