Spark Row to JSON

nav*_*ige 17 json scala apache-spark apache-spark-sql

我想从Spark v.1.6(使用scala)数据框创建一个JSON.我知道有一个简单的解决方案df.toJSON.

但是,我的问题看起来有点不同.例如,考虑具有以下列的数据帧:

|  A  |     B     |  C1  |  C2  |    C3   |
-------------------------------------------
|  1  | test      |  ab  |  22  |  TRUE   |
|  2  | mytest    |  gh  |  17  |  FALSE  |
Run Code Online (Sandbox Code Playgroud)

我想最后有一个数据帧

|  A  |     B     |                        C                   |
----------------------------------------------------------------
|  1  | test      | { "c1" : "ab", "c2" : 22, "c3" : TRUE }    |
|  2  | mytest    | { "c1" : "gh", "c2" : 17, "c3" : FALSE }   |
Run Code Online (Sandbox Code Playgroud)

其中,C是含有JSON C1,C2,C3.不幸的是,我在编译时我不知道数据帧是什么样的(除了列AB总是"固定").

至于我之所以需要这个:我使用Protobuf来发送结果.不幸的是,我的数据框有时会有比预期更多的列,我仍然会通过Protobuf发送它们,但我不想在定义中指定所有列.

我怎样才能做到这一点?

Mic*_*ust 22

Spark 2.1应该对这个用例有本机支持(参见#15354).

import org.apache.spark.sql.functions.to_json
df.select(to_json(struct($"c1", $"c2", $"c3")))
Run Code Online (Sandbox Code Playgroud)

  • 使用上述解决方案时,我得到的 json 结果如下,带有额外字符 **\** ```\"SIGNAL\":[{\"TIME\":1569382072016,\"VALUE\":-9}], \"SIGNAL01\":[{\"TIME\":1569382099654,\"VALUE\":8.0}]}"}``` 如何从结果中删除多余的字符?@Michael Armbrust (2认同)

小智 10

我使用这个命令来解决 to_json 问题:

output_df = (df.select(to_json(struct(col("*"))).alias("content")))
Run Code Online (Sandbox Code Playgroud)


zer*_*323 5

首先让我们将C转换为struct:

val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C"))
Run Code Online (Sandbox Code Playgroud)

这个结构可以toJSON像以前一样转换为JSONL :

dfStruct.toJSON.collect
// Array[String] = Array(
//   {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}}, 
//   {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}})
Run Code Online (Sandbox Code Playgroud)

我不知道任何可以转换单个列的内置方法,但您可以单独转换它,join或者在UDF中使用您喜欢的JSON解析器.

case class C(C1: String, C2: Int, C3: Boolean)

object CJsonizer {
  import org.json4s._
  import org.json4s.JsonDSL._
  import org.json4s.jackson.Serialization
  import org.json4s.jackson.Serialization.write

  implicit val formats = Serialization.formats(org.json4s.NoTypeHints)

  def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
}


val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => 
  CJsonizer.toJSON(c1, c2, c3))

df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3"))
Run Code Online (Sandbox Code Playgroud)

  • 就像他说的,只需使用“UDF”即可。您甚至不必在“UDF”中使用成熟的 JSON 解析器——您可以使用“map”和“mkString”即时创建 JSON 字符串。您可能需要使用“DataFrame.columns”或可能的“DataFrame.dtypes”来制作“select”语句并作为“UDF”中“map”的基础。 (2认同)
  • "Row"是丑陋的,因为我讨厌在Scala中处理JSON - 这是文化的冲突,Loosey,goosey vs strong,static typing.SQL是松散的goosey - 你是一个远离定义新类型的`select` - 因此`Row`是混乱的.Avro的`GenericRecord`也存在同样的问题. (2认同)

Dav*_*fin 5

这里,没有JSON解析器,它适应您的架构:

import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}

df.select(
  col(df.columns(0)),
  col(df.columns(1)),
  concat(
    lit("{"), 
    concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
      val c = dt._1;
      val t = dt._2;
      concat(
        lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "")  ),
        col(c),
        lit(if(t=="StringType") "\""; else "") 
      )
    }):_*), 
    lit("}")
  ) as "C"
).collect()
Run Code Online (Sandbox Code Playgroud)