Bg1*_*850 3 apache-spark pyspark spark-dataframe
我不确定自己是否做错了什么,请原谅我,如果我的问题可以通过以下数据重现
from pyspark.sql import Row
df = sc.parallelize([Row(C3=u'Dec 1 2013 12:00AM'),
Row(C3=u'Dec 1 2013 12:00AM'),
Row(C3=u'Dec 5 2013 12:00AM')]).toDF()
Run Code Online (Sandbox Code Playgroud)
我创建了一个函数将此日期字符串解析为datetime对象,以进行进一步处理
from datetime import datetime
def date_convert(date_str):
date_format = '%b %d %Y %I:%M%p'
try:
dt=datetime.strptime(date_str,date_format)
except ValueError,v:
if len(v.args) > 0 and v.args[0].startswith('unconverted data remains: '):
dt = dt[:-(len(v.args[0])-26)]
dt=datetime.strptime(dt,date_format)
else:
raise v
return dt
Run Code Online (Sandbox Code Playgroud)
现在,如果我以此做为UDF并应用于我的数据框,则会得到意外的数据
from pyspark.sql.functions import udf
date_convert_udf = udf(date_convert)
df.select(date_convert_udf(df.C3).alias("datetime")).take(2)
Run Code Online (Sandbox Code Playgroud)
结果如下
Out[40]:
[Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]'),
Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]')]
Run Code Online (Sandbox Code Playgroud)
但是如果我在将数据帧设为RDD之后使用它,那么它将返回一个pythond datetime对象
df.rdd.map(lambda row:date_convert(row.C3)).collect()
(1) Spark Jobs
Out[42]:
[datetime.datetime(2013, 12, 1, 0, 0),
datetime.datetime(2013, 12, 1, 0, 0),
datetime.datetime(2013, 12, 5, 0, 0)]
Run Code Online (Sandbox Code Playgroud)
我想用dataframe实现类似的事情。我该怎么办?这种方法有什么问题(数据框架上的UDF)
这是因为您必须设置的返回类型数据UDF。显然,您正在尝试获取timestamps,如果是这种情况,则必须编写类似的内容。
from pyspark.sql.types import TimestampType
date_convert_udf = udf(date_convert, TimestampType())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1927 次 |
| 最近记录: |