使用 Spark 2 从 json 解析纪元毫秒

dla*_*lin 5 parsing json apache-spark

from_json有人在 Spark 2+ 中解析过毫秒时间戳吗?是怎么做到的?

\n\n

因此Spark 将TimestampTypev2 中的 epoch 数值解析为以秒为单位,而不是以毫秒为单位。

\n\n

我的输入是一个配置单元表,其中的一列中有一个 json 格式的字符串,我尝试像这样解析:

\n\n
val spark = SparkSession\n  .builder\n  .appName("Problematic Timestamps")\n  .enableHiveSupport()\n  .getOrCreate()\nimport spark.implicits._\nval schema = StructType(\n  StructField("categoryId", LongType) ::\n  StructField("cleared", BooleanType) ::\n  StructField("dataVersion", LongType) ::\n  StructField("details", DataTypes.createArrayType(StringType)) ::\n  \xe2\x80\xa6\n  StructField("timestamp", TimestampType) ::\n  StructField("version", StringType) :: Nil\n)\nval item_parsed =\n    spark.sql("select * FROM source.jsonStrInOrc")\n    .select(\'itemid, \'locale,\n            from_json(\'internalitem, schema)\n                as \'internalitem,\n            \'version, \'createdat, \'modifiedat)\nval item_flattened = item_parsed\n    .select(\'itemid, \'locale,\n            $"internalitem.*",\n            \'version as\'outer_version, \'createdat, \'modifiedat)\n
Run Code Online (Sandbox Code Playgroud)\n\n

这可以解析具有包含以下内容的列的行:

\n\n
\n

{“时间戳”:1494790299549,“已清除”:false,“版本”:“V1”,“数据版本”:2,“categoryId”:2641,“详细信息”:[],\ xe2 \ x80 \ xa6}

\n
\n\n

这给了我timestamp一些字段,比如我宁愿读为的49338-01-08 00:39:09.0值:14947902995492017-05-14 19:31:39.549

\n\n

现在我可以将时间戳的模式设置为 long,然后将该值除以 1000 并转换为时间戳,但随后我就没有2017-05-14 19:31:39.0002017-05-14 19:31:39.549。我无法弄清楚我该如何:

\n\n
    \n
  • 告诉from_json解析毫秒时间戳(也许通过以某种方式对 TimestampType 进行子类化以在模式中使用)
  • \n
  • LongType在模式中使用 a并将其转换为保留毫秒的时间戳
  • \n
\n\n

UDF 附录

\n\n

我发现尝试在选择中进行除法然后进行转换对我来说看起来并不干净,尽管这是一个完全有效的方法。我选择了一个使用 a 的 UDF java.sql.timestamp,它实际上以纪元毫秒为单位指定。

\n\n
import java.sql.Timestamp\n\nimport org.apache.spark.sql.SparkSession\nimport org.apache.spark.sql.functions.{explode, from_json, udf}\nimport org.apache.spark.sql.types.\n{BooleanType, DataTypes, IntegerType, LongType,\nStringType, StructField, StructType, TimestampType}\n\nval tsmillis = udf { t: Long => new Timestamp (t) }\n\nval spark = SparkSession\n  .builder\n  .appName("Problematic Timestamps")\n  .enableHiveSupport()\n  .getOrCreate()\nimport spark.implicits._\nval schema = StructType(\n  StructField("categoryId", LongType) ::\n  StructField("cleared", BooleanType) ::\n  StructField("dataVersion", LongType) ::\n  StructField("details", DataTypes.createArrayType(StringType)) ::\n  \xe2\x80\xa6\n  StructField("timestamp", LongType) ::\n  StructField("version", StringType) :: Nil\n)\nval item_parsed =\n    spark.sql("select * FROM source.jsonStrInOrc")\n    .select(\'itemid, \'locale,\n            from_json(\'internalitem, schema)\n                as \'internalitem,\n            \'version, \'createdat, \'modifiedat)\nval item_flattened = item_parsed\n    .select(\'itemid, \'locale,\n            $"internalitem.categoryId", $"internalitem.cleared",\n            $"internalitem.dataVersion", $"internalitem.details",\n            tsmillis($"internalitem.timestamp"),\n            $"internalitem.version",\n            \'version as\'outer_version, \'createdat, \'modifiedat)\n
Run Code Online (Sandbox Code Playgroud)\n\n

看看选择中的情况如何。\n我认为值得进行性能测试,看看使用withcolumn除法和转换是否比udf.

\n

zer*_*323 4

现在我可以将时间戳的模式设置为 long,然后将该值除以 1000

实际上这正是您所需要的,只需保持类型正确即可。假设您只有一个Long timestamp字段:

val df = spark.range(0, 1).select(lit(1494790299549L).alias("timestamp"))
// df: org.apache.spark.sql.DataFrame = [timestamp: bigint]
Run Code Online (Sandbox Code Playgroud)

如果除以 1000:

val inSeconds = df.withColumn("timestamp_seconds", $"timestamp" / 1000)
// org.apache.spark.sql.DataFrame = [timestamp: bigint, timestamp_seconds: double]
Run Code Online (Sandbox Code Playgroud)

您将获得以秒为单位的双精度时间戳(请注意,这是 SQL,而不是 Scala 行为)。

剩下的就是castSpark < 3.1

inSeconds.select($"timestamp_seconds".cast("timestamp")).show(false)
// +-----------------------+
// |timestamp_seconds      |
// +-----------------------+
// |2017-05-14 21:31:39.549|
// +-----------------------+
Run Code Online (Sandbox Code Playgroud)

或 ( Spark >= 3.1 ) timestamp_seconds(或直接timestamp_millis

import org.apache.spark.sql.functions.{expr, timestamp_seconds}

inSeconds.select(timestamp_seconds($"timestamp_seconds")).show(false)

// +------------------------------------+
// |timestamp_seconds(timestamp_seconds)|
// +------------------------------------+
// |2017-05-14 21:31:39.549             |
// +------------------------------------+

df.select(expr("timestamp_millis(timestamp)")).show(false)
// +---------------------------+
// |timestamp_millis(timestamp)|
// +---------------------------+
// |2017-05-14 21:31:39.549    |
// +---------------------------+
Run Code Online (Sandbox Code Playgroud)