使用udf在PySpark数据框中将纪元转换为日期时间

aho*_*osh 4 python apache-spark apache-spark-sql pyspark

我有一个具有以下架构的PySpark数据框:

root
 |-- epoch: double (nullable = true)
 |-- var1: double (nullable = true)
 |-- var2: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

历元以秒为单位,应转换为日期时间。为此,我定义了一个用户定义的函数(udf),如下所示:

from pyspark.sql.functions import udf    
import time
def epoch_to_datetime(x):
    return time.localtime(x)
    # return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(x))
    # return x * 0 + 1

epoch_to_datetime_udf = udf(epoch_to_datetime, DoubleType())
df.withColumn("datetime", epoch_to_datetime(df2.epoch)).show()
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

---> 21     return time.localtime(x)
    22     # return x * 0 + 1
    23 
    TypeError: a float is required
Run Code Online (Sandbox Code Playgroud)

如果我仅返回x + 1该函数,它将起作用。尝试float(x)or float(str(x))numpy.float(x)in time.localtime(x)都无济于事,但仍然出现错误。之外udftime.localtime(1.514687216E9)或其他数字工作正常。使用datetime包转换epoch为datetim会导致类似错误。

似乎timedatetime软件包都不喜欢DoubleType从PySpark加载。有什么想法可以解决这个问题吗?谢谢。

Ram*_*jan 7

您不需要udf为此的功能

您只需要将double epoch列强制转换为timestampType(),然后使用data_format如下功能

from pyspark.sql import functions as f
from pyspark.sql import types as t
df.withColumn('epoch', f.date_format(df.epoch.cast(dataType=t.TimestampType()), "yyyy-MM-dd"))
Run Code Online (Sandbox Code Playgroud)

这会给你一个字符串日期

root
 |-- epoch: string (nullable = true)
 |-- var1: double (nullable = true)
 |-- var2: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

您可以使用to_date以下功能

from pyspark.sql import functions as f
from pyspark.sql import types as t
df.withColumn('epoch', f.to_date(df.epoch.cast(dataType=t.TimestampType())))
Run Code Online (Sandbox Code Playgroud)

这将使你date作为datatypeepoch

root
 |-- epoch: date (nullable = true)
 |-- var1: double (nullable = true)
 |-- var2: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我希望答案是有帮助的


San*_*wad 5

Ramesh Maharjan 的答案不支持在时间戳中获取毫秒或微秒。添加对毫秒的支持的更新答案如下:

实施Dao Thi 的回答中建议的方法

import pyspark.sql.functions as F
df = spark.createDataFrame([('22-Jul-2018 04:21:18.792 UTC', ),('23-Jul-2018 04:21:25.888 UTC',)], ['TIME'])
df.show(2,False)
df.printSchema()
Run Code Online (Sandbox Code Playgroud)

输出:

+----------------------------+
|TIME                        |
+----------------------------+
|22-Jul-2018 04:21:18.792 UTC|
|23-Jul-2018 04:21:25.888 UTC|
+----------------------------+
root
|-- TIME: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

将字符串时间格式(包括毫秒)转换为unix_timestamp(double)。使用子字符串方法从字符串中提取毫秒(start_position = -7,length_of_substring = 3)并将毫秒单独添加到unix_timestamp。(转换为子字符串以浮动以进行添加)

df1 = df.withColumn("unix_timestamp",F.unix_timestamp(df.TIME,'dd-MMM-yyyy HH:mm:ss.SSS z') + F.substring(df.TIME,-7,3).cast('float')/1000)
Run Code Online (Sandbox Code Playgroud)

在 Spark中将unix_timestamp(double)转换为时间戳数据类型。

df2 = df1.withColumn("TimestampType",F.to_timestamp(df1["unix_timestamp"]))
df2.show(n=2,truncate=False)
Run Code Online (Sandbox Code Playgroud)

这将为您提供以下输出

+----------------------------+----------------+-----------------------+
|TIME                        |unix_timestamp  |TimestampType          |
+----------------------------+----------------+-----------------------+
|22-Jul-2018 04:21:18.792 UTC|1.532233278792E9|2018-07-22 04:21:18.792|
|23-Jul-2018 04:21:25.888 UTC|1.532319685888E9|2018-07-23 04:21:25.888|
+----------------------------+----------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)

检查架构:

df2.printSchema()


root
 |-- TIME: string (nullable = true)
 |-- unix_timestamp: double (nullable = true)
 |-- TimestampType: timestamp (nullable = true)
Run Code Online (Sandbox Code Playgroud)


Tim*_*Tim 5

对我来说,我需要将长时间戳转换回日期格式。

我使用了对我有用的@Glicth 评论。- 可能会帮助别人。

from pyspark.sql import functions as f
from pyspark.sql.functions import col,lit
from datetime import datetime

df001 = spark.createDataFrame([(1639518261056, ),(1639518260824,)], ['timestamp_long'])
df002 = df001.withColumn("timestamp",f.to_timestamp(df001['timestamp_long']/1000))
df001.printSchema()
display(df002)
Run Code Online (Sandbox Code Playgroud)

图式

root
 |-- timestamp_long: long (nullable = true)
Run Code Online (Sandbox Code Playgroud)

使用 Databricks:输出display(df002) 在此输入图像描述