pyspark -- 对 Array(Integer()) 类型的列中的值求和的最佳方法

js_*_*_55 9 apache-spark apache-spark-sql pyspark spark-dataframe

可以说这是我的数据框...

name | scores
Dan  |  [10,5,2,12]
Ann  |  [ 12,3,5]
Jon  |  [ ] 
Run Code Online (Sandbox Code Playgroud)

所需的输出类似于

name | scores         | Total
Dan  |  [10,5,2,12]   | 29
Ann  |   [ 12,3,5]    | 20
Jon  |  [ ]           | 0
Run Code Online (Sandbox Code Playgroud)

我按照......制作了一个UDF

sum_cols = udf(lambda arr: if arr == [] then 0 else __builtins__.sum(arr),IntegerType())

df.withColumn('Total', sum_cols(col('scores'))).show()
Run Code Online (Sandbox Code Playgroud)

但是,我了解到 UDF 相对于纯 pySpark 函数来说相对较慢。

有没有办法在没有 UDF 的情况下在 pySpark 中执行上面的代码?

joh*_*d12 9

对于 Spark 3.1+,您可以简单地调用pyspark.sql.functions.aggregate

import pyspark.sql.functions as F
df = df.withColumn(
    "Total",
    F.aggregate("scores", F.lit(0), lambda acc, x: acc + x)
)
Run Code Online (Sandbox Code Playgroud)

F.lit(0.0)请注意,如果列类型不是整数,则应该使用。


Mel*_*.cz 8

您可以使用高阶 SQL 函数AGGREGATE减少函数式编程),如下所示:

import pyspark.sql.functions as F
df = df.select(
  'name',
  F.expr('AGGREGATE(scores, 0, (acc, x) -> acc + x)').alias('Total')
)
Run Code Online (Sandbox Code Playgroud)

第一个参数是数组列,第二个是初始值(应该与您求和的值具有相同的类型,因此如果您的输入不是整数,您可能需要使用“0.0”或“DOUBLE(0)”等)和第三个参数是一个 lambda 函数,它将数组的每个元素添加到一个累加器变量(在开始时这将被设置为初始值 0)。

转换将在单个投影算子中运行,因此将非常有效。此外,您不需要提前知道数组的大小,数组的每一行可以有不同的长度。