Ale*_* R. 26 python apache-spark apache-spark-sql pyspark
作为一个简化示例,我有一个数据框"df",其列为"col1,col2",我想在将函数应用于每列后计算行的最大值:
def f(x):
return (x+1)
max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())
df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))
Run Code Online (Sandbox Code Playgroud)
所以如果df:
col1 col2
1 2
3 0
Run Code Online (Sandbox Code Playgroud)
然后
DF2:
col1 col2 result
1 2 3
3 0 4
Run Code Online (Sandbox Code Playgroud)
以上似乎不起作用并产生"无法评估表达式:PythonUDF#f ......"
我绝对肯定"f_udf"在我的桌子上运行得很好,主要问题在于max_udf.
如果不创建额外的列或使用基本的map/reduce,有没有办法完全使用数据帧和udfs?我该如何修改"max_udf"?
我也尝试过:
max_udf=udf(max, IntegerType())
Run Code Online (Sandbox Code Playgroud)
这会产生相同的错误.
我还确认以下工作:
df2=(df.withColumn("temp1", f_udf(df.col1))
.withColumn("temp2", f_udf(df.col2))
df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))
Run Code Online (Sandbox Code Playgroud)
为什么我不能一气呵成呢?
我希望看到一个可以概括为任何函数"f_udf"和"max_udf"的答案.
Chr*_*ler 41
我有一个类似的问题,并在这个stackoverflow问题的答案中找到了解决方案
要将多个列或整行传递给UDF,请使用结构:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType
df = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
count_empty_columns = udf(lambda row: len([x for x in row if x == None]), IntegerType())
new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns])))
new_df.show()
Run Code Online (Sandbox Code Playgroud)
收益:
+----+----+----------+
| a| b|null_count|
+----+----+----------+
|null|null| 2|
| 1|null| 1|
|null| 2| 1|
+----+----+----------+
Run Code Online (Sandbox Code Playgroud)
UserDefinedFunction在接受UDF作为参数时抛出错误.
您可以像下面一样修改max_udf以使其工作.
df = sc.parallelize([(1, 2), (3, 0)]).toDF(["col1", "col2"])
max_udf = udf(lambda x, y: max(x + 1, y + 1), IntegerType())
df2 = df.withColumn("result", max_udf(df.col1, df.col2))
Run Code Online (Sandbox Code Playgroud)
要么
def f_udf(x):
return (x + 1)
max_udf = udf(lambda x, y: max(x, y), IntegerType())
## f_udf=udf(f, IntegerType())
df2 = df.withColumn("result", max_udf(f_udf(df.col1), f_udf(df.col2)))
Run Code Online (Sandbox Code Playgroud)
注意:
当且仅当内部函数(此处f_udf)生成有效的SQL表达式时,第二种方法才有效.
它的工作原理在这里,因为f_udf(df.col1)和f_udf(df.col2)作为评估Column<b'(col1 + 1)'>和Column<b'(col2 + 1)'>分别传递给前max_udf.它不适用于任意功能.
如果我们尝试这样的例子,它将无法工作:
from math import exp
df.withColumn("result", max_udf(exp(df.col1), exp(df.col2)))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
20222 次 |
| 最近记录: |