Pyspark 根据其他列和运行计数器添加列

jus*_*dev 1 pyspark

我在 pyspark 数据框中有数据(这是一个非常大的表,有 900M 行)

数据框包含具有以下值的列:

+---------------+
|prev_display_id|
+---------------+
|           null|
|           null|
|           1062|
|           null|
|           null|
|           null|
|           null|
|       18882624|
|       11381128|
|           null|
|           null|
|           null|
|           null|
|           2779|
|           null|
|           null|
|           null|
|           null|
+---------------+
Run Code Online (Sandbox Code Playgroud)

我正在尝试基于此列生成一个新列,如下所示:

+---------------+------+
|prev_display_id|result|
+---------------+------+
|           null|     0|
|           null|     1|
|           1062|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
|       18882624|     0|
|       11381128|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
|           2779|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
+---------------+------+
Run Code Online (Sandbox Code Playgroud)

新列的函数类似于:

new_col = 0 if (prev_display_id!=null) else col = col+1
Run Code Online (Sandbox Code Playgroud)

其中 col 就像一个正在运行的计数器,当遇到非空值时,它会重置为零。

如何在 pyspark 中有效地完成此操作?

更新

我尝试了下面@anki 建议的解决方案。我非常适合小型数据集,但它会产生以下错误:

WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Run Code Online (Sandbox Code Playgroud)

不幸的是,对于我的大数据集来说,它似乎杀死了集群。请参阅下图,了解在具有 2 个 rd5.2xlarge 数据节点的大数据集上运行时出现的错误:

在此输入图像描述

知道如何解决这个问题吗?

ank*_*_91 5

据我了解,您可以创建一个 id 列,然后在不为 null 的monotonically_increasing_id情况下在窗口上求和,然后取按该列分区的行号并减 1:prev_display_id

w = Window.orderBy(F.monotonically_increasing_id())
w1 = F.sum((F.col("prev_display_id").isNotNull()).cast("integer")).over(w)

(df.withColumn("result",F.row_number()
 .over(Window.partitionBy(w1).orderBy(w1))-1).drop("idx")).show()
Run Code Online (Sandbox Code Playgroud)
+---------------+------+
|prev_display_id|result|
+---------------+------+
|           null|     0|
|           null|     1|
|           1062|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
|       18882624|     0|
|       11381128|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
|           2779|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
+---------------+------+
Run Code Online (Sandbox Code Playgroud)