pySpark withColumn 与函数

use*_*587 3 user-defined-functions apache-spark apache-spark-sql pyspark

我有一个包含 2 列的数据框:account_id并且email_address,现在我想再添加一列updated_email_address,我在该列上调用一些函数email_address来获取updated_email_address. 这是我的代码:

def update_email(email):
  print("== email to be updated: " + email)
  today = datetime.date.today()
  updated = substring(email, -8, 8) + str(today.strftime('%m')) + str(today.strftime('%d')) + "_updated"
  return updated

df.withColumn('updated_email_address', update_email(df.email_address))
Run Code Online (Sandbox Code Playgroud)

但结果显示updated_email_address列为空:

+---------------+--------------+---------------------+
|account_id     |email_address |updated_email_address|
+---------------+--------------+---------------------+
|123456gd7tuhha |abc@test.com  |null           |
|djasevneuagsj1 |cde@test.com  |null           |
+---------------+--------------+---------------+
Run Code Online (Sandbox Code Playgroud)

updated_email在它打印出来的函数内部:

Column<b'(email_address + == email to be udpated: )'>
Run Code Online (Sandbox Code Playgroud)

它还将 df 的列数据类型显示为:

dfData:pyspark.sql.dataframe.DataFrame
account_id:string
email_address:string
updated_email_address:double
Run Code Online (Sandbox Code Playgroud)

为什么updated_email_address列类型是double?

bla*_*hop 9

您正在使用类型调用 Python 函数Column。您必须从中创建 udfupdate_email然后使用它:

update_email_udf = udf(update_email)
Run Code Online (Sandbox Code Playgroud)

但是,我建议您不要使用 UDF 进行此类转换,您可以仅使用 Spark 内置函数来完成此操作(UDF 因性能不佳而闻名):

df.withColumn('updated_email_address',
              concat(substring(col("email_address"), -8, 8), date_format(current_date(), "ddMM"), lit("_updated"))
             ).show()
Run Code Online (Sandbox Code Playgroud)

您可以在此处找到所有 Spark SQL 内置函数。


Ben*_*t F 5

好吧,谢谢你,我必须重新学习我在火花课上忘记的东西

您不能直接使用 WithColumn 调用自定义函数,您需要使用 UserDefinedFunctions (UDF)

这是我如何获得自定义函数来处理您的数据框的快速示例(StringType 是函数的返回类型)

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def update_email(email):
  return email+"aaaa"
#df.dtypes

my_udf = udf(lambda x: update_email(x), StringType())

df.withColumn('updated_email_address', my_udf(df.email_address) ).show()
Run Code Online (Sandbox Code Playgroud)