我想从Spark v.1.6(使用scala)数据框创建一个JSON.我知道有一个简单的解决方案df.toJSON.
但是,我的问题看起来有点不同.例如,考虑具有以下列的数据帧:
| A | B | C1 | C2 | C3 |
-------------------------------------------
| 1 | test | ab | 22 | TRUE |
| 2 | mytest | gh | 17 | FALSE |
Run Code Online (Sandbox Code Playgroud)
我想最后有一个数据帧
| A | B | C |
----------------------------------------------------------------
| 1 | test | { "c1" : "ab", "c2" : 22, "c3" : TRUE } |
| 2 | mytest | { "c1" : "gh", "c2" : 17, "c3" : FALSE …Run Code Online (Sandbox Code Playgroud) 输入:
id1 id2 name value epid
"xxx" "yyy" "EAN" "5057723043" "1299"
"xxx" "yyy" "MPN" "EVBD" "1299"
Run Code Online (Sandbox Code Playgroud)
我想要:
{ "id1": "xxx",
"id2": "yyy",
"item_specifics": [
{
"name": "EAN",
"value": "5057723043"
},
{
"name": "MPN",
"value": "EVBD"
},
{
"name": "EPID",
"value": "1299"
}
]
}
Run Code Online (Sandbox Code Playgroud)
我从如何将列聚合到json数组中尝试了以下两种解决方案?以及如何将行合并为spark数据框的列作为有效json以将其写入mysql中:
pi_df.groupBy(col("id1"), col("id2"))
//.agg(collect_list(to_json(struct(col("name"), col("value"))).alias("item_specifics"))) // => not working
.agg(collect_list(struct(col("name"),col("value"))).alias("item_specifics"))
Run Code Online (Sandbox Code Playgroud)
但是我得到了:
{ "name":"EAN","value":"5057723043", "EPID": "1299", "id1": "xxx", "id2": "yyy" }
Run Code Online (Sandbox Code Playgroud)
如何解决这个问题?谢谢