parquet int96时间戳通过python转换为日期时间/日期

Dou*_*esh 6 python timestamp casting apache-spark parquet

长话短说

我想将int96 值(例如 ACIE4NxJAAAKhSUA)转换可读的时间戳格式(例如2020-03-02 14:34:22或任何可以正常解释的格式)...我主要使用 python,所以我希望构建一个执行此转换的函数。如果有另一个函数可以起到相反的作用,那就更好了。

背景

我正在使用 parquet-tools 通过以下命令将原始 parquet 文件(使用快速压缩)转换为原始 JSON:

C:\Research> java -jar parquet-tools-1.8.2.jar cat --json original-file.snappy.parquet > parquet-output.json
Run Code Online (Sandbox Code Playgroud)

在 JSON 中,我将这些值视为时间戳:

{... "_id":"101836","timestamp":"ACIE4NxJAAAKhSUA"}
Run Code Online (Sandbox Code Playgroud)

我已经确定“ACIE4NxJAAAKhSUA”的时间戳值确实是 int96 (这也通过读取镶木地板文件的架构得到了证实......

message spark_schema {
 ...(stuff)...
  optional binary _id (UTF8);
  optional int96 timestamp;
}
Run Code Online (Sandbox Code Playgroud)

我认为这也被称为 Impala Timestamp (至少这是我收集到的)

进一步的问题研究

我一直在到处寻找一些关于如何“读取” int96 值(到 python 中——我想将其保留在该语言中,因为我最熟悉它)并输出时间戳的函数或信息——我什么也没找到。

这是我已经研究过的一篇文章(与该主题相关):

  • ParquetWriter 在 SO 中的研究在这里
  • 通过 golan 在 SO中铸造 int96注意:这有一个我可以探索的功能,但我不知道如何深入研究

关于贬值的 int96 时间戳

请不要要求我停止在 parquet 文件中使用旧的/已贬值的时间戳格式,通过迄今为止所做的研究,我很清楚这一点。我是文件/数据的接收者——我无法更改创建时使用的格式。

如果有另一种方法来控制初始 JSON 输出以提供“非 int96”值——我也会对此感兴趣。

非常感谢您对社区的帮助!

Ang*_*nda 6

parquet-tools 将无法将格式类型从 INT96 更改为 INT64。您在 json 输出中观察到的是存储在 INT96 TimestampType 中的时间戳的字符串表示形式。您将需要 Spark 使用 INT64 TimestampType 中的时间戳重写此镶木地板,然后 json 输出将生成时间戳(以您想要的格式)。

您需要在 Spark 中设置特定的配置 -

spark-shell --conf spark.sql.parquet.outputTimestampType=TIMESTAMP_MICROS

2020-03-16 11:37:50 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.0.20:4040
Spark context available as 'sc' (master = local[*], app id = local-1584383875924).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

val sourceDf = spark.read.parquet("original-file.snappy.parquet")
2020-03-16 11:38:31 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
sourceDf: org.apache.spark.sql.DataFrame = [application: struct<name: string, upgrades: struct<value: double> ... 3 more fields>, timestamp: timestamp ... 16 more fields]

scala> sourceDf.repartition(1).write.parquet("Downloads/output")
Run Code Online (Sandbox Code Playgroud)

Parquet-tools 将显示正确的 TimestampType

parquet-tools schema Downloads/output/part-00000-edba239b-e696-4b4e-8fd3-c7cca9eea6bf-c000.snappy.parquet 

message spark_schema {
  ...
  optional binary _id (UTF8);
  optional int64 timestamp (TIMESTAMP_MICROS);
  ...
}
Run Code Online (Sandbox Code Playgroud)

json 转储给出 -

parquet-tools cat --json Downloads/output/part-00000-edba239b-e696-4b4e-8fd3-c7cca9eea6bf-c000.snappy.parquet

{..."_id":"101836", "timestamp":1583973827000000}
Run Code Online (Sandbox Code Playgroud)

记录的时间戳以纳秒为单位。希望这可以帮助!