Spark DataFrame 序列化为无效 json

gtu*_*rri 6 json apache-spark apache-spark-sql

TL;DR:当我将 Spark 转储DataFrame为 json 时,我总是会得到类似的结果

{"key1": "v11", "key2": "v21"}
{"key1": "v12", "key2": "v22"}
{"key1": "v13", "key2": "v23"}
Run Code Online (Sandbox Code Playgroud)

这是无效的 json。我可以手动编辑转储的文件以获得可以解析的内容:

[
  {"key1": "v11", "key2": "v21"},
  {"key1": "v12", "key2": "v22"},
  {"key1": "v13", "key2": "v23"}
]
Run Code Online (Sandbox Code Playgroud)

但我很确定我错过了一些可以让我避免这种手动编辑的东西。我只是现在不知道什么。

更多细节

我有一个org.apache.spark.sql.DataFrame,我尝试使用以下代码将其转储为 json:

myDataFrame.write.json("file.json")
Run Code Online (Sandbox Code Playgroud)

我也尝试过:

myDataFrame.toJSON.saveAsTextFile("file.json")
Run Code Online (Sandbox Code Playgroud)

在这两种情况下,它最终都会正确转储每一行,但缺少行之间的分隔逗号以及方括号。因此,当我随后尝试解析该文件时,我使用的解析器会侮辱我,然后失败。

我将很高兴了解如何转储有效的 json。(阅读DataFrameWriter的文档没有给我提供任何有趣的提示。)

hi-*_*zir 2

这是预期的输出。Spark 使用类似JSON Lines 的格式有多种原因:

  • 它可以并行解析和加载。
  • 无需将完整文件加载到内存中即可完成解析。
  • 可以并行编写。
  • 无需在内存中存储完整分区即可写入。
  • 即使文件为空,输入也是有效的。
  • 最后,RowSpark 中的结构体映射到 JSON 对象而不是数组。
  • ...

您可以通过多种方式创建所需的输出,但它总是会与上述方式之一发生冲突。

例如,您可以为每个分区编写一个 JSON文档:

import org.apache.spark.sql.functions._

df
  .groupBy(spark_partition_id)
  .agg(collect_list(struct(df.columns map col: _*)).alias("data"))
  .select($"data")
  .write
  .json(output_path)
Run Code Online (Sandbox Code Playgroud)

您可以在前面加上此前缀repartition(1)以获得单个输出文件,但这不是您想要做的事情,除非数据非常小。

1.6 替代方案是 glom

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val newSchema = StructType(Seq(StructField("data", ArrayType(df.schema))))

sqlContext.createDataFrame(
  df.rdd.glom.flatMap(a => if(a.isEmpty) Seq() else Seq(Row(a))), 
  newSchema
)
Run Code Online (Sandbox Code Playgroud)