读取与分区名称冲突的列的 Spark 数据

JBe*_*rdo 5 python apache-spark pyspark

我有以下文件路径,我们在 s3 上读取分区

prefix/company=abcd/service=xyz/date=2021-01-01/file_01.json
prefix/company=abcd/service=xyz/date=2021-01-01/file_02.json
prefix/company=abcd/service=xyz/date=2021-01-01/file_03.json
Run Code Online (Sandbox Code Playgroud)

当我用 pyspark 阅读这些内容时

self.spark \
    .read \
    .option("basePath", 'prefix') \
    .schema(self.schema) \
    .json(['company=abcd/service=xyz/date=2021-01-01/'])
Run Code Online (Sandbox Code Playgroud)

所有文件都具有相同的架构,并作为行加载到表中。一个文件可能是这样的:

{"id": "foo", "color": "blue", "date": "2021-12-12"}
Run Code Online (Sandbox Code Playgroud)

问题是有时文件的日期字段与我的分区代码冲突,例如date. 所以我想知道是否可以加载没有分区列的文件,重命名 JSON 日期列,然后添加分区列。

决赛桌为:

| id  | color | file_date  | company | service | date       |
-------------------------------------------------------------
| foo | blue  | 2021-12-12 | abcd    | xyz     | 2021-01-01 |
| bar | red   | 2021-10-10 | abcd    | xyz     | 2021-01-01 |
| baz | green | 2021-08-08 | abcd    | xyz     | 2021-01-01 |
Run Code Online (Sandbox Code Playgroud)

编辑:

更多信息:我有时有 5 或 6 个分区,日期是其中之一(不是最后一个)。我也需要一次读取多个日期分区。我传递给 Spark 的模式还包含分区列,这使得它更加复杂。

我不控制输入数据,所以我需要按原样读取。我可以重命名文件列,但不能重命名分区列。

是否可以像加入 2 个数据帧时那样向文件列添加别名?

火花3.1

bla*_*hop 3

一种方法是使用Hadoop FS APIprefix等列出 S3 路径下的文件,然后将该列表传递给. 这样 Spark 就不会将它们检测为分区,并且您可以根据需要重命名文件列。spark.read

将文件加载到数据帧后,循环遍历 df 列并重命名partitions_colums列表中也存在的那些(file例如通过添加前缀)。

input_file_name()最后,从using函数中解析分区regexp_extract

这是一个例子:

from pyspark.sql import functions as F

Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
conf = sc._jsc.hadoopConfiguration()

s3_path = "s3://bucket/prefix"
file_cols = ["id", "color", "date"]
partitions_cols = ["company", "service", "date"]

# listing all files for input path
json_files = []
files = Path(s3_path).getFileSystem(conf).listFiles(Path(s3_path), True)

while files.hasNext():
    path = files.next().getPath()
    if path.getName().endswith(".json"):
        json_files.append(path.toString())

df = spark.read.json(json_files) # you can pass here the schema of the files without the partition columns

# renaming file column in if exists in partitions
df = df.select(*[
    F.col(c).alias(c) if c not in partitions_cols else F.col(c).alias(f"file_{c}")
    for c in df.columns
])

# parse partitions from filenames
for p in partitions_cols:
    df = df.withColumn(p, F.regexp_extract(F.input_file_name(), f"/{p}=([^/]+)/", 1))

df.show()

#+-----+----------+---+-------+-------+----------+
#|color| file_date| id|company|service|      date|
#+-----+----------+---+-------+-------+----------+
#|green|2021-08-08|baz|   abcd|    xyz|2021-01-01|
#| blue|2021-12-12|foo|   abcd|    xyz|2021-01-01|
#|  red|2021-10-10|bar|   abcd|    xyz|2021-01-01|
#+-----+----------+---+-------+-------+----------+
Run Code Online (Sandbox Code Playgroud)