enn*_*ppi 5 python json apache-spark pyspark
我想将我的spark数据框编写为一组JSON文件,尤其是将每个文件都编写为JSON数组。让我用一个简单的(可重现的)代码进行解释。
我们有:
import numpy as np
import pandas as pd
df = spark.createDataFrame(pd.DataFrame({'x': np.random.rand(100), 'y': np.random.rand(100)}))
Run Code Online (Sandbox Code Playgroud)
将数据框另存为:
df.write.json('s3://path/to/json')
Run Code Online (Sandbox Code Playgroud)
刚创建的每个文件每行都有一个JSON对象,如下所示:
{"x":0.9953802385540144,"y":0.476027611419198}
{"x":0.929599290575914,"y":0.72878523939521}
{"x":0.951701684432855,"y":0.8008064729546504}
Run Code Online (Sandbox Code Playgroud)
但我想每个文件都有一个JSON数组:
[
{"x":0.9953802385540144,"y":0.476027611419198},
{"x":0.929599290575914,"y":0.72878523939521},
{"x":0.951701684432855,"y":0.8008064729546504}
]
Run Code Online (Sandbox Code Playgroud)
目前不可能让 Spark 以您想要的格式“本地”写入单个文件,因为 Spark 以分布式(并行)方式工作,每个执行器独立写入其部分数据。
但是,由于您可以让每个文件成为 json 数组而不仅仅是 [one] file,因此您可以使用以下一种解决方法来实现所需的输出:
from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct
df.select(to_json(struct(*df.columns)).alias("json"))\
.groupBy(spark_partition_id())\
.agg(collect_list("json").alias("json_list"))\
.select(col("json_list").cast("string"))\
.write.text("s3://path/to/json")
Run Code Online (Sandbox Code Playgroud)
首先,您json从 中的所有列创建一个df。然后按 Spark 分区 ID 进行分组并使用 进行聚合collect_list。这会将该json分区上的所有 s 放入一个列表中。由于您是在分区内聚合,因此不需要重新整理数据。
现在选择列表列,转换为字符串,并将其写入文本文件。
以下是一个文件的外观示例:
[{"x":0.1420523746714616,"y":0.30876114874052263}, ... ]
Run Code Online (Sandbox Code Playgroud)
请注意,您可能会得到一些空文件。
如果您指定了一个空的文件,那么您可能可以强制 Spark 将数据写入一个文件中groupBy,但这会导致将所有数据强制写入单个分区,从而可能导致内存不足错误。
| 归档时间: |
|
| 查看次数: |
102 次 |
| 最近记录: |