Spark使用上一行的值向数据框添加新列

Kit*_*ito 33 python dataframe apache-spark apache-spark-sql pyspark

我想知道如何在Spark(Pyspark)中实现以下功能

初始数据帧:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
Run Code Online (Sandbox Code Playgroud)

结果数据帧:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+
Run Code Online (Sandbox Code Playgroud)

我设法通过以下方式将新列"附加"到数据框中: df.withColumn("new_Col", df.num * 10)

但是我不知道如何为新列实现这种"行的移位",以便新列具有前一行的字段值(如示例所示).我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容.

任何帮助,将不胜感激.

zer*_*323 37

您可以lag按如下方式使用窗口功能

from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()

## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## |  2|3.0|    5.0|
## |  3|7.0|    3.0|
## |  4|9.0|    7.0|
## +---+---+-------+
Run Code Online (Sandbox Code Playgroud)

但是有一些重要的问题:

  1. 如果你需要一个全局操作(没有被其他一些列/列分区),那么效率非常低.
  2. 您需要一种自然的方式来订购数据.

虽然第二个问题几乎从来都不是问题,但第一个问题可能是一个交易破坏者.如果是这种情况,您应该简单地将您转换DataFrame为RDD并lag手动计算.参见例如:

其他有用的链接: