orderBy 如何影响 Pyspark 数据框中的 Window.partitionBy?

All*_*ati 5 window sql-order-by pyspark

我通过一个例子来解释我的问题:
让我们假设我们有一个如下的数据框:

original_df = sc.createDataFrame([('x', 10,), ('x', 15,), ('x', 10,), ('x', 25,), ('y', 20,), ('y', 10,), ('y', 20,)], ["key", "price"] )
original_df.show()
Run Code Online (Sandbox Code Playgroud)

输出:

+---+-----+
|key|price|
+---+-----+
|  x|   10|
|  x|   15|
|  x|   10|
|  x|   25|
|  y|   20|
|  y|   10|
|  y|   20|
+---+-----+
Run Code Online (Sandbox Code Playgroud)

并假设我想获取prices每个key使用的列表window

w = Window.partitionBy('key')
original_df.withColumn('price_list', F.collect_list('price').over(w)).show()
Run Code Online (Sandbox Code Playgroud)

输出:

+---+-----+----------------+
|key|price|      price_list|
+---+-----+----------------+
|  x|   10|[10, 15, 10, 25]|
|  x|   15|[10, 15, 10, 25]|
|  x|   10|[10, 15, 10, 25]|
|  x|   25|[10, 15, 10, 25]|
|  y|   20|    [20, 10, 20]|
|  y|   10|    [20, 10, 20]|
|  y|   20|    [20, 10, 20]|
+---+-----+----------------+
Run Code Online (Sandbox Code Playgroud)

到现在为止还挺好。
但是,如果我想获得一个有序列表,并将其添加orderBy到我的窗口中,w我会得到:

w = Window.partitionBy('key').orderBy('price')
original_df.withColumn('ordered_list', F.collect_list('price').over(w)).show()
Run Code Online (Sandbox Code Playgroud)

输出:

+---+-----+----------------+
|key|price|    ordered_list|
+---+-----+----------------+
|  x|   10|        [10, 10]|
|  x|   10|        [10, 10]|
|  x|   15|    [10, 10, 15]|
|  x|   25|[10, 10, 15, 25]|
|  y|   10|            [10]|
|  y|   20|    [10, 20, 20]|
|  y|   20|    [10, 20, 20]|
+---+-----+----------------+
Run Code Online (Sandbox Code Playgroud)

这意味着orderBy(某种)也更改rowsBetween了窗口中的行(与所做的相同)!这是不应该做的。

尽管我可以通过rowsBetween在窗口中指定来修复它并获得预期的结果,

w = Window.partitionBy('key').orderBy('price').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
Run Code Online (Sandbox Code Playgroud)

有人可以解释为什么orderBywindow这样影响吗?

Man*_*ngh 13

Spark Window 使用三部分指定:分区、顺序和框架。

  1. 如果没有指定任何部分,则整个数据集将被视为单个窗口。
  2. 当使用列指定分区时,将为列的每个不同值创建一个窗口。如果只指定了分区,那么当为一行计算时间时,将考虑该分区中的所有行。这就是为什么您会看到分区 x 中所有行的所有 4 个值 [10, 15, 10, 25]。
  3. 当指定了分区和排序时,当行函数被评估时,它采用分区中行的排名顺序,并包括所有具有相同或较低值(如果指定了默认 asc 顺序)排名的行。在您的情况下,第一行包括 [10,10] 因为分区中有 2 行具有相同的等级。
  4. 当指定了 Frame 规范 rowsBetween 和 rangeBetween 时,行评估将只选择那些匹配框架规则的行。例如 unbounded 和 currentRow 被指定它会选择当前行和它之前发生的所有行。如果指定了 orderBy,它将相应地更改出现在当前行之前的行。

专门针对您的问题,orderBy 不仅可以对分区数据进行排序,还可以更改行框选择

下面是不同的 windowspec 和相应的输出

Window.orderBy()
+---+-----+----------------------------+
|key|price|price_list                  |
+---+-----+----------------------------+
|x  |15   |[15, 10, 10, 20, 10, 25, 20]|
|x  |10   |[15, 10, 10, 20, 10, 25, 20]|
|y  |10   |[15, 10, 10, 20, 10, 25, 20]|
|y  |20   |[15, 10, 10, 20, 10, 25, 20]|
|x  |10   |[15, 10, 10, 20, 10, 25, 20]|
|x  |25   |[15, 10, 10, 20, 10, 25, 20]|
|y  |20   |[15, 10, 10, 20, 10, 25, 20]|
+---+-----+----------------------------+

Window.partitionBy('key')
+---+-----+----------------+
|key|price|      price_list|
+---+-----+----------------+
|  x|   15|[15, 10, 10, 25]|
|  x|   10|[15, 10, 10, 25]|
|  x|   10|[15, 10, 10, 25]|
|  x|   25|[15, 10, 10, 25]|
|  y|   20|    [20, 10, 20]|
|  y|   10|    [20, 10, 20]|
|  y|   20|    [20, 10, 20]|
+---+-----+----------------+

Window.partitionBy('key').orderBy('price')
+---+-----+----------------+
|key|price|    ordered_list|
+---+-----+----------------+
|  x|   10|        [10, 10]|
|  x|   10|        [10, 10]|
|  x|   15|    [10, 10, 15]|
|  x|   25|[10, 10, 15, 25]|
|  y|   10|            [10]|
|  y|   20|    [10, 20, 20]|
|  y|   20|    [10, 20, 20]|
+---+-----+----------------+

w = Window.partitionBy('key').orderBy(F.desc('price'))
+---+-----+----------------+
|key|price|    ordered_list|
+---+-----+----------------+
|  x|   25|            [25]|
|  x|   15|        [25, 15]|
|  x|   10|[25, 15, 10, 10]|
|  x|   10|[25, 15, 10, 10]|
|  y|   20|        [20, 20]|
|  y|   20|        [20, 20]|
|  y|   10|    [20, 20, 10]|
+---+-----+----------------+

Window.partitionBy('key').orderBy('price').rowsBetween(Window.unboundedPreceding, Window.currentRow)
+---+-----+----------------+
|key|price|    ordered_list|
+---+-----+----------------+
|  x|   10|            [10]|
|  x|   10|        [10, 10]|
|  x|   15|    [10, 10, 15]|
|  x|   25|[10, 10, 15, 25]|
|  y|   10|            [10]|
|  y|   20|        [10, 20]|
|  y|   20|    [10, 20, 20]|
+---+-----+----------------+

Window.partitionBy('key').rowsBetween(Window.unboundedPreceding, Window.currentRow)
+---+-----+----------------+
|key|price|    ordered_list|
+---+-----+----------------+
|  x|   15|            [15]|
|  x|   10|        [15, 10]|
|  x|   10|    [15, 10, 10]|
|  x|   25|[15, 10, 10, 25]|
|  y|   10|            [10]|
|  y|   20|        [10, 20]|
|  y|   20|    [10, 20, 20]|
+---+-----+----------------+
Run Code Online (Sandbox Code Playgroud)