如何计算 PySpark 中行之间的差异?

Flu*_*uxy 12 python apache-spark apache-spark-sql pyspark

这是我在 PySpark 中的数据框:

utc_timestamp               data    feed
2015-10-13 11:00:00+00:00   1       A
2015-10-13 12:00:00+00:00   5       A
2015-10-13 13:00:00+00:00   6       A
2015-10-13 14:00:00+00:00   10      B
2015-10-13 15:00:00+00:00   11      B
Run Code Online (Sandbox Code Playgroud)

的值data是累积的。

我想得到这个结果(连续行之间的差异,按 分组feed):

utc_timestamp               data    feed
2015-10-13 11:00:00+00:00   1       A
2015-10-13 12:00:00+00:00   4       A
2015-10-13 13:00:00+00:00   1       A  
2015-10-13 14:00:00+00:00   10      B
2015-10-13 15:00:00+00:00   1       B
Run Code Online (Sandbox Code Playgroud)

我会这样做pandas

df["data"] -= (df.groupby("feed")["data"].shift(fill_value=0))
Run Code Online (Sandbox Code Playgroud)

我如何在 PySpark 中做同样的事情?

Sha*_*per 21

您可以使用带有窗口的滞后函数来做到这一点:

from pyspark.sql.window import Window
import pyspark.sql.functions as f

window = Window.partitionBy("feed").orderBy("utc_timestamp")

df = df.withColumn("data", f.col("data") - f.lag(f.col("data"), 1, 0).over(window))
Run Code Online (Sandbox Code Playgroud)


mck*_*mck 9

您可以用作lag的替代品shiftcoalesce( , F.lit(0))的替代品fill_value=0

from pyspark.sql.window import Window
import pyspark.sql.functions as F

window = Window.partitionBy("feed").orderBy("utc_timestamp")

data = F.col("data") - F.coalesce(F.lag(F.col("data")).over(window), F.lit(0))
df.withColumn("data", data)
Run Code Online (Sandbox Code Playgroud)

  • 您可以避免在滞后函数中使用合并设置,默认值为 0:lag(col, offset=1, default=0) (2认同)