nav*_*ige 17 json scala apache-spark apache-spark-sql
我想从Spark v.1.6(使用scala)数据框创建一个JSON.我知道有一个简单的解决方案df.toJSON.
但是,我的问题看起来有点不同.例如,考虑具有以下列的数据帧:
| A | B | C1 | C2 | C3 |
-------------------------------------------
| 1 | test | ab | 22 | TRUE |
| 2 | mytest | gh | 17 | FALSE |
Run Code Online (Sandbox Code Playgroud)
我想最后有一个数据帧
| A | B | C |
----------------------------------------------------------------
| 1 | test | { "c1" : "ab", "c2" : 22, "c3" : TRUE } |
| 2 | mytest | { "c1" : "gh", "c2" : 17, "c3" : FALSE } |
Run Code Online (Sandbox Code Playgroud)
其中,C是含有JSON C1,C2,C3.不幸的是,我在编译时我不知道数据帧是什么样的(除了列A和B总是"固定").
至于我之所以需要这个:我使用Protobuf来发送结果.不幸的是,我的数据框有时会有比预期更多的列,我仍然会通过Protobuf发送它们,但我不想在定义中指定所有列.
我怎样才能做到这一点?
Mic*_*ust 22
Spark 2.1应该对这个用例有本机支持(参见#15354).
import org.apache.spark.sql.functions.to_json
df.select(to_json(struct($"c1", $"c2", $"c3")))
Run Code Online (Sandbox Code Playgroud)
小智 10
我使用这个命令来解决 to_json 问题:
output_df = (df.select(to_json(struct(col("*"))).alias("content")))
Run Code Online (Sandbox Code Playgroud)
首先让我们将C转换为struct:
val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C"))
Run Code Online (Sandbox Code Playgroud)
这个结构可以toJSON像以前一样转换为JSONL :
dfStruct.toJSON.collect
// Array[String] = Array(
// {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}},
// {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}})
Run Code Online (Sandbox Code Playgroud)
我不知道任何可以转换单个列的内置方法,但您可以单独转换它,join或者在UDF中使用您喜欢的JSON解析器.
case class C(C1: String, C2: Int, C3: Boolean)
object CJsonizer {
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write
implicit val formats = Serialization.formats(org.json4s.NoTypeHints)
def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
}
val cToJSON = udf((c1: String, c2: Int, c3: Boolean) =>
CJsonizer.toJSON(c1, c2, c3))
df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3"))
Run Code Online (Sandbox Code Playgroud)
这里,没有JSON解析器,它适应您的架构:
import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}
df.select(
col(df.columns(0)),
col(df.columns(1)),
concat(
lit("{"),
concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
val c = dt._1;
val t = dt._2;
concat(
lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "") ),
col(c),
lit(if(t=="StringType") "\""; else "")
)
}):_*),
lit("}")
) as "C"
).collect()
Run Code Online (Sandbox Code Playgroud)