And*_*hoi 1 apache-spark apache-spark-sql pyspark pyspark-sql
我有如下制表符分隔的数据(csv 文件):
201911240130 a
201911250132 b
201911250143 c
201911250223 z
201911250224 d
...
Run Code Online (Sandbox Code Playgroud)
我想按年、月、日、小时编写目录组。
hdfs://dest/2019/11/24/01/xxxx.csv
hdfs://dest/2019/11/25/01/xxxx.csv
hdfs://dest/2019/11/25/02/xxxx.csv
Run Code Online (Sandbox Code Playgroud)
如何按 yyyy/mm/dd/hh 写入分区?
partitionByDataFrameWriter 中已经存在,它可以完全满足您的需求,而且要简单得多。此外,还有从时间戳中提取日期部分的函数。
这是您可以考虑的另一种解决方案。
由于您的 CSV 没有标题,您可以在加载时应用自定义标题,这样以后可以轻松操作列:
custom_header = "timestamp\tvalue"
schema = StructType()
col_names = custom_header.split("\t")
for c in col_names:
schema.add(StructField(c.strip(), StringType()))
df = spark.read.csv("hdfs://sample.csv", header=False, sep="\t", schema=schema)
Run Code Online (Sandbox Code Playgroud)
现在,创建列year,month,day,hour从列timestamp如下:
df_final = df.withColumn("timestamp", to_timestamp(col("timestamp"), 'yyyyMMddHHmm')) \
.withColumn("year", date_format(col("timestamp"), "yyyy")) \
.withColumn("month", date_format(col("timestamp"), "MM")) \
.withColumn("day", date_format(col("timestamp"), "dd")) \
.withColumn("hour", date_format(col("timestamp"), "HH")) \
.drop("timestamp")
df_final.show(truncate=False)
+-----+----+-----+---+----+
|value|year|month|day|hour|
+-----+----+-----+---+----+
|a |2019|11 |24 |01 |
|b |2019|11 |25 |01 |
|c |2019|11 |25 |01 |
|z |2019|11 |25 |02 |
|d |2019|11 |25 |02 |
+-----+----+-----+---+----+
Run Code Online (Sandbox Code Playgroud)
最后,使用partitionBy如下方式将 DF 写入目标路径:
df_final.write.partitionBy("year", "month", "day", "hour") \
.mode("overwrite") \
.option("header", "false").option("sep", "\t") \
.csv("hdfs://dest/")
Run Code Online (Sandbox Code Playgroud)
将在/dest/文件夹下创建分区。
| 归档时间: |
|
| 查看次数: |
3031 次 |
| 最近记录: |