kpc*_*der 5 scala apache-spark apache-spark-sql
我有一些时间戳列字段很长且具有时代标准的数据,我需要使用Spark Scala将数据保存为拆分格式,例如yyyy / mm / dd / hh
data.write.partitionBy("timestamp").format("orc").save("mypath")
Run Code Online (Sandbox Code Playgroud)
这只是按时间戳分割数据,如下所示
timestamp=1458444061098
timestamp=1458444061198
Run Code Online (Sandbox Code Playgroud)
但我希望它像
??? YYYY
??? MM
??? DD
??? HH
Run Code Online (Sandbox Code Playgroud)
Sil*_*vio 11
首先,我会警告你过度分区。也就是说,请确保您有足够的数据以使其值得按小时进行分区,否则您最终可能会得到许多带有小文件的分区文件夹。我要提出的第二个注意事项是使用分区层次结构(年/月/日/小时),因为它需要递归分区发现。
话虽如此,如果您确实想按小时进行分区,我建议将您的时间戳截断到小时到一个新列中并以此进行分区。然后,当你读回它时,Spark 将足够聪明地将格式识别为时间戳,并且你实际上可以根据需要执行完全过滤。
input
.withColumn("ts_trunc", date_trunc("HOUR", 'timestamp)) // date_trunc added in Spark 2.3.0
.write
.partitionBy("ts_trunc")
.save("/mnt/warehouse/part-test")
spark.read.load("/mnt/warehouse/part-test").where("hour(ts_trunc) = 10")
Run Code Online (Sandbox Code Playgroud)
另一种选择是按日期和一天中的小时进行分区,如下所示:
input
.withColumn("date", to_date('timestamp))
.withColumn("hour", hour('timestamp))
.write
.partitionBy("date", "hour")
.save("/mnt/warehouse/part-test")
Run Code Online (Sandbox Code Playgroud)
您可以为此使用各种spark sql日期/时间函数。首先,添加一个新的日期类型列,该日期类型列是从unix时间戳列创建的。
val withDateCol = data
.withColumn("date_col", from_unixtime(col("timestamp", "YYYYMMddHH"))
Run Code Online (Sandbox Code Playgroud)
之后,您可以将年,月,日和小时列添加到DF,然后按这些新列进行分区以进行写入。
withDateCol
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.withColumn("hour", hour(col("date_col")))
.drop("date_col")
.partitionBy("year", "month", "day", "hour")
.format("orc")
.save("mypath")
Run Code Online (Sandbox Code Playgroud)
partitionBy子句中包含的列将不属于文件架构。