JBe*_*rdo 5 python apache-spark pyspark
我有以下文件路径,我们在 s3 上读取分区
prefix/company=abcd/service=xyz/date=2021-01-01/file_01.json
prefix/company=abcd/service=xyz/date=2021-01-01/file_02.json
prefix/company=abcd/service=xyz/date=2021-01-01/file_03.json
Run Code Online (Sandbox Code Playgroud)
当我用 pyspark 阅读这些内容时
self.spark \
.read \
.option("basePath", 'prefix') \
.schema(self.schema) \
.json(['company=abcd/service=xyz/date=2021-01-01/'])
Run Code Online (Sandbox Code Playgroud)
所有文件都具有相同的架构,并作为行加载到表中。一个文件可能是这样的:
{"id": "foo", "color": "blue", "date": "2021-12-12"}
Run Code Online (Sandbox Code Playgroud)
问题是有时文件的日期字段与我的分区代码冲突,例如date. 所以我想知道是否可以加载没有分区列的文件,重命名 JSON 日期列,然后添加分区列。
决赛桌为:
| id | color | file_date | company | service | date |
-------------------------------------------------------------
| foo | blue | 2021-12-12 | abcd | xyz | 2021-01-01 |
| bar | red | 2021-10-10 | abcd | xyz | 2021-01-01 |
| baz | green | 2021-08-08 | abcd | xyz | 2021-01-01 |
Run Code Online (Sandbox Code Playgroud)
编辑:
更多信息:我有时有 5 或 6 个分区,日期是其中之一(不是最后一个)。我也需要一次读取多个日期分区。我传递给 Spark 的模式还包含分区列,这使得它更加复杂。
我不控制输入数据,所以我需要按原样读取。我可以重命名文件列,但不能重命名分区列。
是否可以像加入 2 个数据帧时那样向文件列添加别名?
火花3.1
一种方法是使用Hadoop FS APIprefix等列出 S3 路径下的文件,然后将该列表传递给. 这样 Spark 就不会将它们检测为分区,并且您可以根据需要重命名文件列。spark.read
将文件加载到数据帧后,循环遍历 df 列并重命名partitions_colums列表中也存在的那些(file例如通过添加前缀)。
input_file_name()最后,从using函数中解析分区regexp_extract。
这是一个例子:
from pyspark.sql import functions as F
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
conf = sc._jsc.hadoopConfiguration()
s3_path = "s3://bucket/prefix"
file_cols = ["id", "color", "date"]
partitions_cols = ["company", "service", "date"]
# listing all files for input path
json_files = []
files = Path(s3_path).getFileSystem(conf).listFiles(Path(s3_path), True)
while files.hasNext():
path = files.next().getPath()
if path.getName().endswith(".json"):
json_files.append(path.toString())
df = spark.read.json(json_files) # you can pass here the schema of the files without the partition columns
# renaming file column in if exists in partitions
df = df.select(*[
F.col(c).alias(c) if c not in partitions_cols else F.col(c).alias(f"file_{c}")
for c in df.columns
])
# parse partitions from filenames
for p in partitions_cols:
df = df.withColumn(p, F.regexp_extract(F.input_file_name(), f"/{p}=([^/]+)/", 1))
df.show()
#+-----+----------+---+-------+-------+----------+
#|color| file_date| id|company|service| date|
#+-----+----------+---+-------+-------+----------+
#|green|2021-08-08|baz| abcd| xyz|2021-01-01|
#| blue|2021-12-12|foo| abcd| xyz|2021-01-01|
#| red|2021-10-10|bar| abcd| xyz|2021-01-01|
#+-----+----------+---+-------+-------+----------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3291 次 |
| 最近记录: |