pyspark:如何按年/月/日/小时子目录编写数据帧分区?

And*_*hoi 1 apache-spark apache-spark-sql pyspark pyspark-sql

我有如下制表符分隔的数据(csv 文件):

201911240130 a
201911250132 b
201911250143 c
201911250223 z
201911250224 d
...
Run Code Online (Sandbox Code Playgroud)

我想按年、月、日、小时编写目录组。

hdfs://dest/2019/11/24/01/xxxx.csv
hdfs://dest/2019/11/25/01/xxxx.csv
hdfs://dest/2019/11/25/02/xxxx.csv
Run Code Online (Sandbox Code Playgroud)

如何按 yyyy/mm/dd/hh 写入分区?

bla*_*hop 5

partitionByDataFrameWriter 中已经存在,它可以完全满足您的需求,而且要简单得多。此外,还有从时间戳中提取日期部分的函数。

这是您可以考虑的另一种解决方案。

由于您的 CSV 没有标题,您可以在加载时应用自定义标题,这样以后可以轻松操作列:

custom_header = "timestamp\tvalue"
schema = StructType()
col_names = custom_header.split("\t")
for c in col_names:
    schema.add(StructField(c.strip(), StringType()))

df = spark.read.csv("hdfs://sample.csv", header=False, sep="\t", schema=schema)
Run Code Online (Sandbox Code Playgroud)

现在,创建列yearmonthdayhour从列timestamp如下:

df_final = df.withColumn("timestamp", to_timestamp(col("timestamp"), 'yyyyMMddHHmm')) \
           .withColumn("year", date_format(col("timestamp"), "yyyy")) \
           .withColumn("month", date_format(col("timestamp"), "MM")) \
           .withColumn("day", date_format(col("timestamp"), "dd")) \
           .withColumn("hour", date_format(col("timestamp"), "HH")) \
           .drop("timestamp")

df_final.show(truncate=False)

+-----+----+-----+---+----+
|value|year|month|day|hour|
+-----+----+-----+---+----+
|a    |2019|11   |24 |01  |
|b    |2019|11   |25 |01  |
|c    |2019|11   |25 |01  |
|z    |2019|11   |25 |02  |
|d    |2019|11   |25 |02  |
+-----+----+-----+---+----+
Run Code Online (Sandbox Code Playgroud)

最后,使用partitionBy如下方式将 DF 写入目标路径:

df_final.write.partitionBy("year", "month", "day", "hour") \
    .mode("overwrite") \
    .option("header", "false").option("sep", "\t") \
    .csv("hdfs://dest/")
Run Code Online (Sandbox Code Playgroud)

将在/dest/文件夹下创建分区。