按时间戳写入Spark分区数据

kpc*_*der 5 scala apache-spark apache-spark-sql

我有一些时间戳列字段很长且具有时代标准的数据,我需要使用Spark Scala将数据保存为拆分格式,例如yyyy / mm / dd / hh

data.write.partitionBy("timestamp").format("orc").save("mypath") 
Run Code Online (Sandbox Code Playgroud)

这只是按时间戳分割数据,如下所示

timestamp=1458444061098
timestamp=1458444061198
Run Code Online (Sandbox Code Playgroud)

但我希望它像

??? YYYY
    ??? MM
        ??? DD
            ??? HH
Run Code Online (Sandbox Code Playgroud)

Sil*_*vio 11

首先,我会警告你过度分区。也就是说,请确保您有足够的数据以使其值得按小时进行分区,否则您最终可能会得到许多带有小文件的分区文件夹。我要提出的第二个注意事项是使用分区层次结构(年/月/日/小时),因为它需要递归分区发现。

话虽如此,如果您确实想按小时进行分区,我建议将您的时间戳截断到小时到一个新列中并以此进行分区。然后,当你读回它时,Spark 将足够聪明地将格式识别为时间戳,并且你实际上可以根据需要执行完全过滤。

input
  .withColumn("ts_trunc", date_trunc("HOUR", 'timestamp)) // date_trunc added in Spark 2.3.0
  .write
  .partitionBy("ts_trunc")
  .save("/mnt/warehouse/part-test")

spark.read.load("/mnt/warehouse/part-test").where("hour(ts_trunc) = 10")
Run Code Online (Sandbox Code Playgroud)

另一种选择是按日期和一天中的小时进行分区,如下所示:

input
  .withColumn("date", to_date('timestamp))
  .withColumn("hour", hour('timestamp))
  .write
  .partitionBy("date", "hour")
  .save("/mnt/warehouse/part-test")
Run Code Online (Sandbox Code Playgroud)

  • 是的,这就是分区文件夹的工作原理,也是您想要的 (2认同)

Con*_*ine 7

您可以为此使用各种spark sql日期/时间函数。首先,添加一个新的日期类型列,该日期类型列是从unix时间戳列创建的。

val withDateCol = data
.withColumn("date_col", from_unixtime(col("timestamp", "YYYYMMddHH"))
Run Code Online (Sandbox Code Playgroud)

之后,您可以将年,月,日和小时列添加到DF,然后按这些新列进行分区以进行写入。

withDateCol
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.withColumn("hour", hour(col("date_col")))
.drop("date_col")
.partitionBy("year", "month", "day", "hour")
.format("orc")
.save("mypath") 
Run Code Online (Sandbox Code Playgroud)

partitionBy子句中包含的列将不属于文件架构。