Spark Strutured Streaming自动将时间戳转换为本地时间

Mar*_*iak 18 java scala apache-spark apache-spark-sql spark-structured-streaming

我有UTC和ISO8601的时间戳,但使用结构化流,它会自动转换为本地时间.有没有办法阻止这种转换?我想在UTC中使用它.

我正在从Kafka读取json数据,然后使用from_jsonSpark函数解析它们.

输入:

{"Timestamp":"2015-01-01T00:00:06.222Z"}
Run Code Online (Sandbox Code Playgroud)

流:

SparkSession
  .builder()
  .master("local[*]")
  .appName("my-app")
  .getOrCreate()
  .readStream()
  .format("kafka")
  ... //some magic
  .writeStream()
  .format("console")
  .start()
  .awaitTermination();
Run Code Online (Sandbox Code Playgroud)

架构:

StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("Timestamp", DataTypes.TimestampType, true),});
Run Code Online (Sandbox Code Playgroud)

输出:

+--------------------+
|           Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)

如您所见,小时数自动增加.

PS:我试着尝试from_utc_timestampSpark功能,但没有运气.

ast*_*asz 29

对我来说它起作用了:

spark.conf.set("spark.sql.session.timeZone", "UTC")
Run Code Online (Sandbox Code Playgroud)

它告诉spark SQL使用UTC作为时间戳的默认时区.我在spark SQL中使用它,例如:

select *, cast('2017-01-01 10:10:10' as timestamp) from someTable
Run Code Online (Sandbox Code Playgroud)

我知道它在2.0.1中不起作用.但适用于Spark 2.2.我SQLTransformer也用过它而且很有效.

我不确定流媒体.


use*_*411 16

注意:

这个答案主要用于Spark <2.2.对于新版本的Spark看到答案通过ASTRO-ASZ

但是我们应该注意到,截至今天(Spark 2.4.0),spark.sql.session.timeZone没有设置user.timezone(java.util.TimeZone.getDefault).因此,单独设置``spark.sql.session.timeZone`可能会导致SQL和非SQL组件使用不同时区设置的相当尴尬的情况.

因此,我仍然建议user.timezone明确设置,即使spark.sql.session.timeZone已设置.

TL; DR不幸的是,这就是Spark现在处理时间戳的方式,除了直接在纪元时间上运行之外,实际上没有内置替代方案,而不使用日期/时间实用程序.

您可以对Spark开发人员列表进行深入的讨论:SQL TIMESTAMP语义与SPARK-18350

到目前为止,我已经找到了干净的解决方法是设置-Duser.timezoneUTC驾驶者和执行者两种.例如,提交:

bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" \
                --conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"
Run Code Online (Sandbox Code Playgroud)

或者通过调整配置文件(spark-defaults.conf):

spark.driver.extraJavaOptions      -Duser.timezone=UTC
spark.executor.extraJavaOptions    -Duser.timezone=UTC
Run Code Online (Sandbox Code Playgroud)