读取没有分区列名称的分区列

KOB*_*KOB 3 amazon-s3 partition apache-spark parquet pyspark

我们将数据存储在 s3 中,分区结构如下:

bucket/directory/table/aaaa/bb/cc/dd/
Run Code Online (Sandbox Code Playgroud)

其中aaaa是年,bb是月,cc是日,dd是小时。

如您所见,路径中没有分区键(year=aaaa, month=bb, day=cc, hour=dd).

结果,当我将表读入 Spark 时,没有yearmonthdayhour列。

无论如何,我可以将表读入 Spark 并包含分区列,而无需

  • 更改 s3 中的路径名
  • 在循环中迭代每个分区值,并将每个分区一一读取到 Spark 中(这是一个巨大的表,这需要太长的时间,并且显然不是最佳的)。

bla*_*hop 6

Spark 无法发现未按partition_name=value路径编码的分区,因此您必须创建它们。

将路径加载bucket/directory/table/aaaa/bb/cc/dd/到 DataFrame 中后,您可以从使用 . 获得的源文件名中提取这些分区input_file_name()

首先,使用分隔符分割文件名路径/,然后从最后 4 个元素创建列:

from pyspark.sql import functions as F

df1 = df.withColumn("date_partitions", F.slice(F.split(F.input_file_name(), "/"), -5, 4)) \
    .withColumn("year", F.col("date_partitions").getItem(0)) \
    .withColumn("month", F.col("date_partitions").getItem(1)) \
    .withColumn("day", F.col("date_partitions").getItem(2)) \
    .withColumn("hour", F.col("date_partitions").getItem(3)) \
    .drop("data_partitions")
Run Code Online (Sandbox Code Playgroud)

例子:

data = [
    (1, 2, "bucket/directory/table/2021/01/10/14/"),
    (3, 4, "bucket/directory/table/2021/01/11/18/")
]

df = spark.createDataFrame(data, ["a", "b", "input_file_name"])
Run Code Online (Sandbox Code Playgroud)

给出:

#+---+---+-------------------------------------+----+-----+---+----+
#|a  |b  |input_file_name                      |year|month|day|hour|
#+---+---+-------------------------------------+----+-----+---+----+
#|1  |2  |bucket/directory/table/2021/01/10/14/|2021|01   |10 |14  |
#|3  |4  |bucket/directory/table/2021/01/11/18/|2021|01   |11 |18  |
#+---+---+-------------------------------------+----+-----+---+----+
Run Code Online (Sandbox Code Playgroud)