PySpark 中日期时间的舍入时间

LaS*_*Sul 1 python user-defined-functions apache-spark pyspark

我正在尝试使用 pyspark 和 udf 来调整小时数。

该函数在 python 中工作正常,但在使用 pyspark 时效果不佳。

输入是:

date = Timestamp('2016-11-18 01:45:55') # type is pandas._libs.tslibs.timestamps.Timestamp

def time_feature_creation_spark(date):
    return date.round("H").hour

time_feature_creation_udf = udf(lambda x : time_feature_creation_spark(x), IntegerType())
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

然后我在提供 spark 的函数中使用它:

data = data.withColumn("hour", time_feature_creation_udf(data["date"])

错误是:

类型错误:“列”对象不可调用

预期输出只是日期时间中最接近的小时(例如 20h45 最接近 21h,因此返回 21)

LN_*_*N_P 9

A nicer version than /3600*3600 is using the built-in function date_trunc

import pyspark.sql.functions as F
return df.withColumn("hourly_timestamp", F.date_trunc("hour", df.timestamp))
Run Code Online (Sandbox Code Playgroud)

other formats besides hour are

year’, ‘yyyy’, ‘yy’, ‘month’, ‘mon’, ‘mm’, ‘day’, ‘dd’, ‘hour’, ‘minute’, ‘second’, ‘week’, ‘quarter’


ril*_*yss 5

您不能只在 Pandas 数据帧上应用 pyspark udf。

如果要在spark中进行这种转换,需要先将pandas数据帧转换为spark数据帧。

date1 = Timestamp('2016-11-18 01:45:55')
date2 = Timestamp('2016-12-18 01:45:55')
df = pd.DataFrame({"date": [date1, date2]})

data = sqlContext.createDataFrame(df)
Run Code Online (Sandbox Code Playgroud)

然后要计算四舍五入的小时数,您不需要 UDF。这条线可以解决问题。

result = data.withColumn("hour", hour((round(unix_timestamp("date")/3600)*3600).cast("timestamp")))
Run Code Online (Sandbox Code Playgroud)

它的作用是:

  1. timestamp使用以秒为单位将时间转换为 unix 时间unix_timestamp()
  2. 除以 3600 到小时,四舍五入,然后乘以 3600
  3. 使用 cast()
  4. 使用hour()函数提取小时

Spark 使用它自己的数据类型,因此当您将 Pandas 数据帧pandas._libs.tslibs.timestamps.Timestamp转换为Sparkpyspark.sql.types.TimestampType数据帧时, a将转换为 a ,因此 Pandas 函数不再起作用。