如何在Spark中编写有效的json

pav*_*dav 1 json apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

我需要编写有效的json,但spark允许一次写入单行,例如:

{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
{"name":"Michael", "address":{"city":null, "state":"California"}}
Run Code Online (Sandbox Code Playgroud)

以上Json无效。相反,我需要这个:

{
{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}},
{"name":"Michael", "address":{"city":null, "state":"California"}}
}
Run Code Online (Sandbox Code Playgroud)

如何在Java中实现?

Pio*_*ski 5

Frist从将DataFrame行转换为json开始:

斯卡拉

val jsonDs = df.toJSON
Run Code Online (Sandbox Code Playgroud)

爪哇

Dataset<String> jsonDs = simpleProf.toJSON();
Run Code Online (Sandbox Code Playgroud)

Scala示例:

case class Data(name: String, age: Int)
case class DataObj(id: String, seq: Seq[Data])

val df = session.createDataFrame(Seq(
 DataObj("1", Seq(Data("n1", 1), Data("n2", 2))),
 DataObj("2", Seq(Data("n1", 1), Data("n2", 2), Data("n3", 3))),
 DataObj("3", Seq(Data("n1", 1))),
 DataObj("4", Seq(Data("n4", 44))),
 DataObj("5", Seq(Data("n5", 55)))
))

val jsonDs = df.toJSON
Run Code Online (Sandbox Code Playgroud)

下一步取决于您要保存到一个分区还是一个分区多个文件。

保存到一个JSON文件

斯卡拉

val count = jsonDs.count()
jsonDs
  .repartition(1) // make sure it is only one partition and in consequence one output file
  .rdd
  .zipWithIndex()
  .map { case(json, idx) =>
      if(idx == 0) "[\n" + json + "," // first row
      else if(idx == count-1) json + "\n]" // last row
      else json + ","
  }
  .saveAsTextFile("path")
Run Code Online (Sandbox Code Playgroud)

爪哇

jsonDs
  .repartition(1) // make sure it is only one partition and in consequence one output file
  .javaRDD()
  .zipWithIndex()
  .map(t -> t._2 == 0 ? "[\n" + t._1 + "," : t._2 == count-1 ? t._1 + "\n]" : t._1 + ",")
  .saveAsTextFile("path");
Run Code Online (Sandbox Code Playgroud)

为每个分区保存到多个JSON文件

斯卡拉

jsonDs
  .mapPartitions(vals => Iterator("[" + vals.mkString(",") + "]"))
  .write
  .text("path")
Run Code Online (Sandbox Code Playgroud)

爪哇

import org.apache.commons.lang3.StringUtils;

jsonDs
  .mapPartitions(input -> Arrays.asList("[" + StringUtils.join(input, ",") + "]").iterator(), Encoders.STRING())
  .write()
  .text("path");
Run Code Online (Sandbox Code Playgroud)