根据先前的值和行 Pyspark 填充列

Ben*_*Ami 4 apache-spark apache-spark-sql pyspark

我有一个包含 5 列组、日期、a、b 和 c 的 spark 数据框,我想执行以下操作:

给定 df

group    date      a    b      c
  a     2018-01    2    3     10
  a     2018-02    4    5     null
  a     2018-03    2    1     null
Run Code Online (Sandbox Code Playgroud)

预期产出

group    date      a    b      c
  a     2018-01    2    3     10
  a     2018-02    4    5     10*3+2=32
  a     2018-03    2    1     32*5+4=164
Run Code Online (Sandbox Code Playgroud)

对于每组,通过 b * c + a 计算 c,并将输出用作下一行的 c。

我尝试使用滞后和窗口函数,但找不到正确的方法。

wer*_*ner 7

在窗口中,您无法访问当前要计算的列的结果。这将迫使 Spark 按顺序进行计算,应该避免。另一种方法是递归计算变换c_n = func(c_(n-1))成式只使用的(恒定)值ab和的第一值c

公式

这个公式的所有输入值都可以用一个窗口收集,公式本身实现为udf

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window

df = ...
w=Window.partitionBy('group').orderBy('date')
df1 = df.withColumn("la", F.collect_list("a").over(w)) \
  .withColumn("lb", F.collect_list("b").over(w)) \
  .withColumn("c0", F.first("c").over(w))

import numpy as np

def calc_c(c0, a, b):
  if c0 is None:
    return 0.0
  if len(a) == 1:
    return float(c0)
  e1 = c0 * np.prod(b[:-1])
  e2 = 0.0
  for i,an in enumerate(a[:-1]):
    e2 = e2 + an * np.prod(b[i+1:-1])
  return float(e1 + e2)


calc_c_udf= F.udf(calc_c, T.DoubleType())

df1.withColumn("result", calc_c_udf("c0", "la", "lb")) \
  .show()
Run Code Online (Sandbox Code Playgroud)

输出:

+-----+-------+---+---+----+---------+---------+---+------+
|group|   date|  a|  b|   c|       la|       lb| c0|result|
+-----+-------+---+---+----+---------+---------+---+------+
|    a|2018-01|  2|  3|  10|      [2]|      [3]| 10|  10.0|
|    a|2018-02|  4|  5|null|   [2, 4]|   [3, 5]| 10|  32.0|
|    a|2018-03|  2|  1|null|[2, 4, 2]|[3, 5, 1]| 10| 164.0|
+-----+-------+---+---+----+---------+---------+---+------+
Run Code Online (Sandbox Code Playgroud)