LaS*_*Sul 1 python user-defined-functions apache-spark pyspark
我正在尝试使用 pyspark 和 udf 来调整小时数。
该函数在 python 中工作正常,但在使用 pyspark 时效果不佳。
输入是:
date = Timestamp('2016-11-18 01:45:55') # type is pandas._libs.tslibs.timestamps.Timestamp
def time_feature_creation_spark(date):
return date.round("H").hour
time_feature_creation_udf = udf(lambda x : time_feature_creation_spark(x), IntegerType())
Run Code Online (Sandbox Code Playgroud)
然后我在提供 spark 的函数中使用它:
data = data.withColumn("hour", time_feature_creation_udf(data["date"])
错误是:
类型错误:“列”对象不可调用
预期输出只是日期时间中最接近的小时(例如 20h45 最接近 21h,因此返回 21)
A nicer version than /3600*3600 is using the built-in function date_trunc
import pyspark.sql.functions as F
return df.withColumn("hourly_timestamp", F.date_trunc("hour", df.timestamp))
Run Code Online (Sandbox Code Playgroud)
other formats besides hour are
year’, ‘yyyy’, ‘yy’, ‘month’, ‘mon’, ‘mm’, ‘day’, ‘dd’, ‘hour’, ‘minute’, ‘second’, ‘week’, ‘quarter’
您不能只在 Pandas 数据帧上应用 pyspark udf。
如果要在spark中进行这种转换,需要先将pandas数据帧转换为spark数据帧。
date1 = Timestamp('2016-11-18 01:45:55')
date2 = Timestamp('2016-12-18 01:45:55')
df = pd.DataFrame({"date": [date1, date2]})
data = sqlContext.createDataFrame(df)
Run Code Online (Sandbox Code Playgroud)
然后要计算四舍五入的小时数,您不需要 UDF。这条线可以解决问题。
result = data.withColumn("hour", hour((round(unix_timestamp("date")/3600)*3600).cast("timestamp")))
Run Code Online (Sandbox Code Playgroud)
它的作用是:
timestamp使用以秒为单位将时间转换为 unix 时间unix_timestamp()cast()hour()函数提取小时Spark 使用它自己的数据类型,因此当您将 Pandas 数据帧pandas._libs.tslibs.timestamps.Timestamp转换为Sparkpyspark.sql.types.TimestampType数据帧时, a将转换为 a ,因此 Pandas 函数不再起作用。
| 归档时间: |
|
| 查看次数: |
5489 次 |
| 最近记录: |