Ben*_*Ami 4 apache-spark apache-spark-sql pyspark
我有一个包含 5 列组、日期、a、b 和 c 的 spark 数据框,我想执行以下操作:
给定 df
group date a b c
a 2018-01 2 3 10
a 2018-02 4 5 null
a 2018-03 2 1 null
Run Code Online (Sandbox Code Playgroud)
预期产出
group date a b c
a 2018-01 2 3 10
a 2018-02 4 5 10*3+2=32
a 2018-03 2 1 32*5+4=164
Run Code Online (Sandbox Code Playgroud)
对于每组,通过 b * c + a 计算 c,并将输出用作下一行的 c。
我尝试使用滞后和窗口函数,但找不到正确的方法。
在窗口中,您无法访问当前要计算的列的结果。这将迫使 Spark 按顺序进行计算,应该避免。另一种方法是递归计算变换c_n = func(c_(n-1))
成式只使用的(恒定)值a
,b
和的第一值c
:
这个公式的所有输入值都可以用一个窗口收集,公式本身实现为udf:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window
df = ...
w=Window.partitionBy('group').orderBy('date')
df1 = df.withColumn("la", F.collect_list("a").over(w)) \
.withColumn("lb", F.collect_list("b").over(w)) \
.withColumn("c0", F.first("c").over(w))
import numpy as np
def calc_c(c0, a, b):
if c0 is None:
return 0.0
if len(a) == 1:
return float(c0)
e1 = c0 * np.prod(b[:-1])
e2 = 0.0
for i,an in enumerate(a[:-1]):
e2 = e2 + an * np.prod(b[i+1:-1])
return float(e1 + e2)
calc_c_udf= F.udf(calc_c, T.DoubleType())
df1.withColumn("result", calc_c_udf("c0", "la", "lb")) \
.show()
Run Code Online (Sandbox Code Playgroud)
输出:
+-----+-------+---+---+----+---------+---------+---+------+
|group| date| a| b| c| la| lb| c0|result|
+-----+-------+---+---+----+---------+---------+---+------+
| a|2018-01| 2| 3| 10| [2]| [3]| 10| 10.0|
| a|2018-02| 4| 5|null| [2, 4]| [3, 5]| 10| 32.0|
| a|2018-03| 2| 1|null|[2, 4, 2]|[3, 5, 1]| 10| 164.0|
+-----+-------+---+---+----+---------+---------+---+------+
Run Code Online (Sandbox Code Playgroud)