Dou*_*esh 6 python timestamp casting apache-spark parquet
我想将int96 值(例如 ACIE4NxJAAAKhSUA)转换为可读的时间戳格式(例如2020-03-02 14:34:22或任何可以正常解释的格式)...我主要使用 python,所以我希望构建一个执行此转换的函数。如果有另一个函数可以起到相反的作用,那就更好了。
我正在使用 parquet-tools 通过以下命令将原始 parquet 文件(使用快速压缩)转换为原始 JSON:
C:\Research> java -jar parquet-tools-1.8.2.jar cat --json original-file.snappy.parquet > parquet-output.json
Run Code Online (Sandbox Code Playgroud)
在 JSON 中,我将这些值视为时间戳:
{... "_id":"101836","timestamp":"ACIE4NxJAAAKhSUA"}
Run Code Online (Sandbox Code Playgroud)
我已经确定“ACIE4NxJAAAKhSUA”的时间戳值确实是 int96 (这也通过读取镶木地板文件的架构得到了证实......
message spark_schema {
...(stuff)...
optional binary _id (UTF8);
optional int96 timestamp;
}
Run Code Online (Sandbox Code Playgroud)
我认为这也被称为 Impala Timestamp (至少这是我收集到的)
我一直在到处寻找一些关于如何“读取” int96 值(到 python 中——我想将其保留在该语言中,因为我最熟悉它)并输出时间戳的函数或信息——我什么也没找到。
这是我已经研究过的一篇文章(与该主题相关):
请不要要求我停止在 parquet 文件中使用旧的/已贬值的时间戳格式,通过迄今为止所做的研究,我很清楚这一点。我是文件/数据的接收者——我无法更改创建时使用的格式。
如果有另一种方法来控制初始 JSON 输出以提供“非 int96”值——我也会对此感兴趣。
非常感谢您对社区的帮助!
parquet-tools 将无法将格式类型从 INT96 更改为 INT64。您在 json 输出中观察到的是存储在 INT96 TimestampType 中的时间戳的字符串表示形式。您将需要 Spark 使用 INT64 TimestampType 中的时间戳重写此镶木地板,然后 json 输出将生成时间戳(以您想要的格式)。
您需要在 Spark 中设置特定的配置 -
spark-shell --conf spark.sql.parquet.outputTimestampType=TIMESTAMP_MICROS
2020-03-16 11:37:50 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.0.20:4040
Spark context available as 'sc' (master = local[*], app id = local-1584383875924).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
val sourceDf = spark.read.parquet("original-file.snappy.parquet")
2020-03-16 11:38:31 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
sourceDf: org.apache.spark.sql.DataFrame = [application: struct<name: string, upgrades: struct<value: double> ... 3 more fields>, timestamp: timestamp ... 16 more fields]
scala> sourceDf.repartition(1).write.parquet("Downloads/output")
Run Code Online (Sandbox Code Playgroud)
Parquet-tools 将显示正确的 TimestampType
parquet-tools schema Downloads/output/part-00000-edba239b-e696-4b4e-8fd3-c7cca9eea6bf-c000.snappy.parquet
message spark_schema {
...
optional binary _id (UTF8);
optional int64 timestamp (TIMESTAMP_MICROS);
...
}
Run Code Online (Sandbox Code Playgroud)
json 转储给出 -
parquet-tools cat --json Downloads/output/part-00000-edba239b-e696-4b4e-8fd3-c7cca9eea6bf-c000.snappy.parquet
{..."_id":"101836", "timestamp":1583973827000000}
Run Code Online (Sandbox Code Playgroud)
记录的时间戳以纳秒为单位。希望这可以帮助!