PySpark - 将上一行和下一行附加到当前行

Chr*_*s C 2 python dataframe apache-spark apache-spark-sql pyspark

假设我有一个像这样的 PySpark 数据框:

1 0 1 0
0 0 1 1
0 1 0 1
Run Code Online (Sandbox Code Playgroud)

如何将一行的最后一列和下一列附加到当前行,如下所示:

1 0 1 0 0 0 0 0 0 0 1 1
0 0 1 1 1 0 1 0 0 1 0 1
0 1 0 1 0 0 1 1 0 0 0 0
Run Code Online (Sandbox Code Playgroud)

我熟悉.withColumn()添加列的方法,但不确定我会在该字段中放置什么。

它们"0 0 0 0"是占位符值,因为这些行之前和之后没有先前或后续行。

pau*_*ult 7

您可以使用pyspark.sql.functions.lead()andpyspark.sql.functions.lag()但首先您需要一种对行进行排序的方法。如果您还没有确定顺序的列,则可以使用以下命令创建一个列pyspark.sql.functions.monotonically_increasing_id()

然后将其与函数结合使用Window

例如,如果您有以下 DataFrame df

df.show()
#+---+---+---+---+
#|  a|  b|  c|  d|
#+---+---+---+---+
#|  1|  0|  1|  0|
#|  0|  0|  1|  1|
#|  0|  1|  0|  1|
#+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)

你可以这样做:

from pyspark.sql import Window
import pyspark.sql.functions as f

cols = df.columns
df = df.withColumn("id", f.monotonically_increasing_id())
df.select(
    "*", 
    *([f.lag(f.col(c),default=0).over(Window.orderBy("id")).alias("prev_"+c) for c in cols] + 
      [f.lead(f.col(c),default=0).over(Window.orderBy("id")).alias("next_"+c) for c in cols])
).drop("id").show()
#+---+---+---+---+------+------+------+------+------+------+------+------+
#|  a|  b|  c|  d|prev_a|prev_b|prev_c|prev_d|next_a|next_b|next_c|next_d|
#+---+---+---+---+------+------+------+------+------+------+------+------+
#|  1|  0|  1|  0|     0|     0|     0|     0|     0|     0|     1|     1|
#|  0|  0|  1|  1|     1|     0|     1|     0|     0|     1|     0|     1|
#|  0|  1|  0|  1|     0|     0|     1|     1|     0|     0|     0|     0|
#+---+---+---+---+------+------+------+------+------+------+------+------+
Run Code Online (Sandbox Code Playgroud)