Pyspark:在UDF中传递多个列

sji*_*han 30 apache-spark pyspark spark-dataframe

我正在编写一个用户定义的函数,它将获取除数据帧中第一个之外的所有列并进行求和(或任何其他操作).现在数据框有时可以有3列或4列或更多列.它会有所不同.

我知道我可以硬编码4个列名作为UDF传递,但在这种情况下它会有所不同所以我想知道如何完成它?

以下是第一个示例中的两个示例,我们有两列要添加,第二个示例中我们有三列要添加.

在此输入图像描述

Mar*_*usz 31

如果要传递给UDF的所有列具有相同的数据类型,则可以使用array作为输入参数,例如:

>>> from pyspark.sql.types import IntegerType
>>> from pyspark.sql.functions import udf, array
>>> sum_cols = udf(lambda arr: sum(arr), IntegerType())
>>> spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B']) \
...     .withColumn('Result', sum_cols(array('A', 'B'))).show()
+---+---+---+------+
| ID|  A|  B|Result|
+---+---+---+------+
|101|  1| 16|    17|
+---+---+---+------+

>>> spark.createDataFrame([(101, 1, 16, 8)], ['ID', 'A', 'B', 'C'])\
...     .withColumn('Result', sum_cols(array('A', 'B', 'C'))).show()
+---+---+---+---+------+
| ID|  A|  B|  C|Result|
+---+---+---+---+------+
|101|  1| 16|  8|    25|
+---+---+---+---+------+
Run Code Online (Sandbox Code Playgroud)

  • 如何为不同类型的列实现它? (4认同)
  • @构造函数,如果不同类型的总和也可以使用“数组”(即整数和双精度 - >两者都将转换为双精度) (2认同)

小智 16

使用struct而不是array

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct
sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols(struct('A', 'B'))).show()
Run Code Online (Sandbox Code Playgroud)

  • 你能解释一下为什么会使用`struct`而不是`array`?我猜这是处理不同类型的列? (4认同)

nee*_*ani 15

没有数组和结构的另一种简单方法。

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct

def sum(x, y):
    return x + y

sum_cols = udf(sum, IntegerType())

a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols('A', 'B')).show()
Run Code Online (Sandbox Code Playgroud)

  • 我认为你不需要导入结构。 (7认同)

Роб*_*адь 6

也许这是一个迟到的答案,但我不喜欢在没有必要的情况下使用 UDF,所以:

from pyspark.sql.functions import col
from functools import reduce
data = [["a",1,2,5],["b",2,3,7],["c",3,4,8]]
df = spark.createDataFrame(data,["id","v1","v2",'v3'])

calculate = reduce(lambda a, x: a+x, map(col, ["v1","v2",'v3']))

df.withColumn("Result", calculate)
#
#id v1  v2  v3  Result
#a  1   2   5   8
#b  2   3   7   12
#c  3   4   8   15
Run Code Online (Sandbox Code Playgroud)

在这里你可以使用任何在Column. 另外,如果你想编写udf具有特定逻辑的自定义,你可以使用它,因为Column提供树执行操作。无需收集到数组并对其求和。

如果与进程作为数组操作相比,从性能角度来看会很糟糕,让我们看一下物理计划,在我的情况和数组情况下,在我的情况和array情况下。

我的情况:

== Physical Plan ==
*(1) Project [id#355, v1#356L, v2#357L, v3#358L, ((v1#356L + v2#357L) + v3#358L) AS Result#363L]
+- *(1) Scan ExistingRDD[id#355,v1#356L,v2#357L,v3#358L]
Run Code Online (Sandbox Code Playgroud)

数组情况:

== Physical Plan ==
*(2) Project [id#339, v1#340L, v2#341L, v3#342L, pythonUDF0#354 AS Result#348]
+- BatchEvalPython [<lambda>(array(v1#340L, v2#341L, v3#342L))], [pythonUDF0#354]
   +- *(1) Scan ExistingRDD[id#339,v1#340L,v2#341L,v3#342L]
Run Code Online (Sandbox Code Playgroud)

如果可能的话 - 我们需要避免使用 UDF,因为 Catalyst 不知道如何优化它们