gtu*_*rri 6 json apache-spark apache-spark-sql
TL;DR:当我将 Spark 转储DataFrame为 json 时,我总是会得到类似的结果
{"key1": "v11", "key2": "v21"}
{"key1": "v12", "key2": "v22"}
{"key1": "v13", "key2": "v23"}
Run Code Online (Sandbox Code Playgroud)
这是无效的 json。我可以手动编辑转储的文件以获得可以解析的内容:
[
{"key1": "v11", "key2": "v21"},
{"key1": "v12", "key2": "v22"},
{"key1": "v13", "key2": "v23"}
]
Run Code Online (Sandbox Code Playgroud)
但我很确定我错过了一些可以让我避免这种手动编辑的东西。我只是现在不知道什么。
更多细节:
我有一个org.apache.spark.sql.DataFrame,我尝试使用以下代码将其转储为 json:
myDataFrame.write.json("file.json")
Run Code Online (Sandbox Code Playgroud)
我也尝试过:
myDataFrame.toJSON.saveAsTextFile("file.json")
Run Code Online (Sandbox Code Playgroud)
在这两种情况下,它最终都会正确转储每一行,但缺少行之间的分隔逗号以及方括号。因此,当我随后尝试解析该文件时,我使用的解析器会侮辱我,然后失败。
我将很高兴了解如何转储有效的 json。(阅读DataFrameWriter的文档没有给我提供任何有趣的提示。)
这是预期的输出。Spark 使用类似JSON Lines 的格式有多种原因:
RowSpark 中的结构体映射到 JSON 对象而不是数组。您可以通过多种方式创建所需的输出,但它总是会与上述方式之一发生冲突。
例如,您可以为每个分区编写一个 JSON文档:
import org.apache.spark.sql.functions._
df
.groupBy(spark_partition_id)
.agg(collect_list(struct(df.columns map col: _*)).alias("data"))
.select($"data")
.write
.json(output_path)
Run Code Online (Sandbox Code Playgroud)
您可以在前面加上此前缀repartition(1)以获得单个输出文件,但这不是您想要做的事情,除非数据非常小。
1.6 替代方案是 glom
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val newSchema = StructType(Seq(StructField("data", ArrayType(df.schema))))
sqlContext.createDataFrame(
df.rdd.glom.flatMap(a => if(a.isEmpty) Seq() else Seq(Row(a))),
newSchema
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2155 次 |
| 最近记录: |