如何在 PySpark 中的 rowsBetween 中使用 unboundedPreceding、unboundedFollowing 和 currentRow

Myk*_*tko 2 python group-by pyspark

pyspark.sql.Window.rowsBetween我对接受Window.unboundedPrecedingWindow.unboundedFollowingWindow.currentRow对象作为start和参数的方法有点困惑end。您能否Window通过一些示例解释一下该函数的工作原理以及如何正确使用对象?谢谢你!

小智 10

基本上,顾名思义,行之间/范围之间有助于限制窗口内考虑的行数。

让我们举一个简单的例子

从数据开始

dfw=spark.createDataFrame([("abc",1,100),("abc",2,200),("abc",3,300),("abc",4,200),("abc",5,100)],"name string,id int,price int")

#output
+----+---+-----+
|name| id|price|
+----+---+-----+
| abc|  1|  100|
| abc|  2|  200|
| abc|  3|  300|
| abc|  4|  200|
| abc|  5|  100|
+----+---+-----+
Run Code Online (Sandbox Code Playgroud)

现在,通过这些数据,让我们尝试找到每行的运行最大值,即最大值

dfw.withColumn("rm",F.max("price").over(Window.partitionBy("name").orderBy("id"))).show()

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|100|
| abc|  2|  200|200|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|300|
+----+---+-----+---+
Run Code Online (Sandbox Code Playgroud)

因此,正如预期的那样,它从上到下逐一查看每个价格,并填充它得到的最大值,此行为称为 start= Window.unboundedPreceding 到 end=Window.currentRow

现在将值之间的行更改为 start= Window.unboundedPreceding 到 end=Window.unbounded 接下来我们将得到如下结果

dfw.withColumn("rm",F.max("price").over(Window.partitionBy("name").orderBy("id").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).show()

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|300|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|300|
+----+---+-----+---+
Run Code Online (Sandbox Code Playgroud)

现在,正如您在同一窗口中看到的那样,它向下查找最大值的所有值,而不是将其限制为当前行

现在第三个将是 start=Window.currentRow 和 end =Window.unboundedFollowing

dfw.withColumn("rm",F.max("price").over(Window.partitionBy("name").orderBy("id").rowsBetween(Window.currentRow,Window.unboundedFollowing))).show()

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|300|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|200|
| abc|  5|  100|100|
+----+---+-----+---+
Run Code Online (Sandbox Code Playgroud)

现在它只向下查找从当前行开始的最大值。

此外,它不仅限于按原样使用这 3 个,您甚至可以 start=Window.currentRow-1 和 end =Window.currentRow+1,因此不必查找上方或下方的所有值,它只会查找上方 1 行和 1 行以下

像这样

dfw.withColumn("rm",F.max("price").over(Window.partitionBy("name").orderBy("id").rowsBetween(Window.currentRow-1,Window.currentRow+1))).show()
# output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|200|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|200|
+----+---+-----+---+
Run Code Online (Sandbox Code Playgroud)

所以你可以想象它是一个窗口内的窗口,它围绕当前行进行处理

  • 这里的答案很好,我知道这是一个老问题,但只是补充一点,您不需要在 rowsBetween 中指定 window.currentRow,因为此处的值会自动从当前行引用。因此 rowsBetween(-1, 1) 也会做同样的事情。 (2认同)