aha*_*jib 2 python user-defined-functions pyspark
我正在编写一个udf,它将使用两个dataframe列以及一个额外的参数(一个常量值),并且应该在dataframe中添加一个新列。我的功能看起来像:
def udf_test(column1, column2, constant_var):
if column1 == column2:
return column1
else:
return constant_var
Run Code Online (Sandbox Code Playgroud)
另外,我正在执行以下操作以传递多列:
apply_test = udf(udf_test, StringType())
df = df.withColumn('new_column', apply_test('column1', 'column2'))
Run Code Online (Sandbox Code Playgroud)
除非我删除了constant_varas作为函数的第三个参数,否则这现在不起作用,但我确实需要它。因此,我尝试执行以下操作:
constant_var = 'TEST'
apply_test = udf(lambda x: udf_test(x, constant_var), StringType())
df = df.withColumn('new_column', apply_test(constant_var)(col('column1', 'column2')))
Run Code Online (Sandbox Code Playgroud)
和
apply_test = udf(lambda x,y: udf_test(x, y, constant_var), StringType())
Run Code Online (Sandbox Code Playgroud)
以上都不对我有用。我基于此以及这些 stackoverflow帖子获得了这些想法,并且我认为我的问题与两者之间的区别是显而易见的。任何帮助将非常感激。
注意:我在这里只是为了讨论而简化了功能,而实际功能却更为复杂。我知道可以使用whenand otherwise语句完成此操作。
您不必使用用户定义的函数。您可以使用when()和else()函数:
from pyspark.sql import functions as f
df = df.withColumn('new_column',
f.when(f.col('col1') == f.col('col2'), f.col('col1'))
.otherwise('other_value'))
Run Code Online (Sandbox Code Playgroud)
另一种方法是生成用户定义的函数。但是,使用udf会对性能产生负面影响,因为必须将数据与python进行反序列化。要生成用户定义的函数,您需要一个返回(用户定义的)函数的函数。例如:
def generate_udf(constant_var):
def test(col1, col2):
if col1 == col2:
return col1
else:
return constant_var
return f.udf(test, StringType())
df = df.withColumn('new_column',
generate_udf('default_value')(f.col('col1'), f.col('col2')))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2100 次 |
| 最近记录: |