在Pyspark中将时间戳更改为UTC格式

Aru*_*u P 2 apache-spark pyspark spark-dataframe

我有一个输入数据ip_df),此数据框中的数据如下所示:

id            timestamp_value
1       2017-08-01T14:30:00+05:30
2       2017-08-01T14:30:00+06:30
3       2017-08-01T14:30:00+07:30
Run Code Online (Sandbox Code Playgroud)

我需要创建一个新的dataframeop_df),其中我需要将时间戳值转换为UTC格式。因此最终输出数据帧将如下所示:

id            timestamp_value
1       2017-08-01T09:00:00+00:00
2       2017-08-01T08:00:00+00:00
3       2017-08-01T07:00:00+00:00
Run Code Online (Sandbox Code Playgroud)

我想使用PySpark实现它。有人可以帮我吗?任何帮助将是适当的。

edd*_*ies 7

如果您绝对需要严格按照指示格式化时间戳,即时区表示为“ +00:00”,那么我认为使用已经建议的UDF 是您的最佳选择。

但是,如果您可以容忍时区的表示形式稍有不同,例如“ +0000”(不使用冒号分隔符)或“ Z”,则可以在没有UDF的情况下执行此操作,根据大小的不同,UDF可能对您的性能更好数据集。

给定以下数据表示

+---+-------------------------+
|id |timestamp_value          |
+---+-------------------------+
|1  |2017-08-01T14:30:00+05:30|
|2  |2017-08-01T14:30:00+06:30|
|3  |2017-08-01T14:30:00+07:30|
+---+-------------------------+
Run Code Online (Sandbox Code Playgroud)

如:

l = [(1, '2017-08-01T14:30:00+05:30'), (2, '2017-08-01T14:30:00+06:30'), (3, '2017-08-01T14:30:00+07:30')]
ip_df = spark.createDataFrame(l, ['id', 'timestamp_value'])
Run Code Online (Sandbox Code Playgroud)

这里timestamp_value是一个String,你可以做以下(这将使用TO_TIMESTAMP会话的本地时区支持,这是在星火2.2中引入):

from pyspark.sql.functions import to_timestamp, date_format
spark.conf.set('spark.sql.session.timeZone', 'UTC')
op_df = ip_df.select(
    date_format(
        to_timestamp(ip_df.timestamp_value, "yyyy-MM-dd'T'HH:mm:ssXXX"), 
        "yyyy-MM-dd'T'HH:mm:ssZ"
    ).alias('timestamp_value'))
Run Code Online (Sandbox Code Playgroud)

产生:

+------------------------+
|timestamp_value         |
+------------------------+
|2017-08-01T09:00:00+0000|
|2017-08-01T08:00:00+0000|
|2017-08-01T07:00:00+0000|
+------------------------+
Run Code Online (Sandbox Code Playgroud)

或者,稍有不同:

op_df = ip_df.select(
    date_format(
        to_timestamp(ip_df.timestamp_value, "yyyy-MM-dd'T'HH:mm:ssXXX"), 
        "yyyy-MM-dd'T'HH:mm:ssXXX"
    ).alias('timestamp_value'))
Run Code Online (Sandbox Code Playgroud)

产生:

+--------------------+
|timestamp_value     |
+--------------------+
|2017-08-01T09:00:00Z|
|2017-08-01T08:00:00Z|
|2017-08-01T07:00:00Z|
+--------------------+
Run Code Online (Sandbox Code Playgroud)


Fab*_*ich 5

您可以在 dateutil 库中使用解析器tz
我假设你有 Strings 并且你想要一个 String Column :

from dateutil import parser, tz
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, udf

# Create UTC timezone
utc_zone =  tz.gettz('UTC')

# Create UDF function that apply on the column
# It takes the String, parse it to a timestamp, convert to UTC, then convert to String again
func = udf(lambda x: parser.parse(x).astimezone(utc_zone).isoformat(),  StringType())

# Create new column in your dataset
df = df.withColumn("new_timestamp",func(col("timestamp_value")))
Run Code Online (Sandbox Code Playgroud)

它给出了这个结果:

<pre>
+---+-------------------------+-------------------------+
|id |timestamp_value          |new_timestamp            |
+---+-------------------------+-------------------------+
|1  |2017-08-01T14:30:00+05:30|2017-08-01T09:00:00+00:00|
|2  |2017-08-01T14:30:00+06:30|2017-08-01T08:00:00+00:00|
|3  |2017-08-01T14:30:00+07:30|2017-08-01T07:00:00+00:00|
+---+-------------------------+-------------------------+
</pre>
Run Code Online (Sandbox Code Playgroud)

最后,您可以删除并重命名:

df = df.drop("timestamp_value").withColumnRenamed("new_timestamp","timestamp_value")
Run Code Online (Sandbox Code Playgroud)