Pyspark:使用带参数的UDF创建新列

Rob*_*inL 0 apache-spark pyspark pyspark-sql

我有一个用户定义的函数如下所示,我想用它来导出我的数据帧中的新列:

def to_date_formatted(date_str, format):
    if date_str == '' or date_str is None:
        return None
    try:
        dt = datetime.datetime.strptime(date_str, format)
    except:
        return None
    return dt.date()

spark.udf.register("to_date_udf", to_date_formatted, DateType())
Run Code Online (Sandbox Code Playgroud)

我可以通过运行sql来使用它select to_date_udf(my_date, '%d-%b-%y') as date.请注意将自定义格式作为参数传递给函数的功能

但是,我很难使用pyspark列表达式语法而不是sql来使用它

我想写一些类似的东西:

df.with_column("date", to_date_udf('my_date', %d-%b-%y')
Run Code Online (Sandbox Code Playgroud)

但这会导致错误.我怎样才能做到这一点?

[编辑:在此特定示例中,在Spark 2.2+中,您可以使用内置to_date函数提供可选的格式参数.我现在正在使用Spark 2.0,所以这对我来说是不可能的.另外值得注意的是我提供了这个例子,但我对提供UDF参数的一般语法感兴趣,而不是日期转换的细节]

Rob*_*inL 6

我找到了三个实现此目的的选项:

设置可重现的示例

import pandas as pd 
import datetime 

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.types import DateType
from pyspark.sql.functions import expr, lit

sc = SparkContext.getOrCreate()
spark = SparkSession(sc) 

def to_date_formatted(date_str, format):
    if date_str == '' or date_str is None:
        return None
    try:
        dt = datetime.datetime.strptime(date_str, format)
    except:
        return None
    return dt.date()

data = {}
data["date_str_1"] = ["01-Dec-17", "05-Jan-12", "08-Mar-15"]
data["date_str_2"] = ["01/12/17", "05/01/12", "08/03/15"]

df = pd.DataFrame(data)
df = spark.createDataFrame(df)
df.registerTempTable('df')
Run Code Online (Sandbox Code Playgroud)

选项1

from pyspark.sql.functions import udf
to_date_udf = udf(to_date_formatted, DateType())
df = df.withColumn("parsed_date", to_date_udf('date_str_1', lit('%d-%b-%y')))
df.show()
Run Code Online (Sandbox Code Playgroud)

选项2

spark.udf.register("to_date_udf", to_date_formatted, DateType())
ex = "to_date_udf(date_str_1, '%d-%b-%y') as d"
df = df.withColumn("parsed_date", expr(ex))

df.show()
Run Code Online (Sandbox Code Playgroud)

选项3

选项3只是为了讨论这个to_date_formatted功能:

from functools import partial
curried_to_date = partial(to_date_formatted, format="%d-%b-%y")

curried_to_date = udf(curried_to_date, DateType())
df.withColumn("parsed_date", curried_to_date('date_str_1'))
Run Code Online (Sandbox Code Playgroud)