为什么python UDF返回意外的datetime对象,而在RDD上应用的同一函数给出正确的datetime对象

Bg1*_*850 3 apache-spark pyspark spark-dataframe

我不确定自己是否做错了什么,请原谅我,如果我的问题可以通过以下数据重现

from pyspark.sql import Row
df = sc.parallelize([Row(C3=u'Dec  1 2013 12:00AM'),
 Row(C3=u'Dec  1 2013 12:00AM'),
 Row(C3=u'Dec  5 2013 12:00AM')]).toDF()
Run Code Online (Sandbox Code Playgroud)

我创建了一个函数将此日期字符串解析为datetime对象,以进行进一步处理

from datetime import datetime
def date_convert(date_str):
   date_format = '%b %d %Y %I:%M%p'
   try:
    dt=datetime.strptime(date_str,date_format)
   except ValueError,v:
    if len(v.args) > 0 and v.args[0].startswith('unconverted data remains: '):
      dt = dt[:-(len(v.args[0])-26)]
      dt=datetime.strptime(dt,date_format)
    else:
      raise v
   return dt
Run Code Online (Sandbox Code Playgroud)

现在,如果我以此做为UDF并应用于我的数据框,则会得到意外的数据

from pyspark.sql.functions import udf
date_convert_udf = udf(date_convert)
df.select(date_convert_udf(df.C3).alias("datetime")).take(2)
Run Code Online (Sandbox Code Playgroud)

结果如下

Out[40]: 
[Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]'),
 Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]')]
Run Code Online (Sandbox Code Playgroud)

但是如果我在将数据帧设为RDD之后使用它,那么它将返回一个pythond datetime对象

df.rdd.map(lambda row:date_convert(row.C3)).collect()
(1) Spark Jobs
Out[42]: 
[datetime.datetime(2013, 12, 1, 0, 0),
 datetime.datetime(2013, 12, 1, 0, 0),
 datetime.datetime(2013, 12, 5, 0, 0)]
Run Code Online (Sandbox Code Playgroud)

我想用dataframe实现类似的事情。我该怎么办?这种方法有什么问题(数据框架上的UDF)

Alb*_*nto 6

这是因为您必须设置的返回类型数据UDF。显然,您正在尝试获取timestamps,如果是这种情况,则必须编写类似的内容。

from pyspark.sql.types import TimestampType
date_convert_udf = udf(date_convert, TimestampType())
Run Code Online (Sandbox Code Playgroud)