在Json中收集DataFrame列的数据

use*_*602 1 json scala apache-spark apache-spark-sql

我有一个DataFrame,其中两列为"key":id1id2:

val df1 = Seq(
  (1, 11, "n1", "d1"),
  (1, 22, "n2", "d2"),
  (2, 11, "n3", "d3"),
  (2, 11, "n4", "d4")
).toDF("id1", "id2", "number", "data")

scala> df1.show
+---+---+------+----+
|id1|id2|number|data|
+---+---+------+----+
|  1| 11|    n1|  d1|
|  1| 22|    n2|  d2|
|  2| 11|    n3|  d3|
|  2| 11|    n4|  d4|
+---+---+------+----+
Run Code Online (Sandbox Code Playgroud)

我想得到Json,按数据框的键分组,如下所示:

+---+---+-------+----------------------------------------------------------+
|id1|id2| json                                                             |
+---+---+-------+----------------------------------------------------------+
|  1| 11|[{"number" : "n1", "data": "d1"}]                                 |
|  1| 22|[{"number" : "n2", "data": "d2"}]                                 |
|  2| 11|[{"number" : "n3", "data": "d3"}, {"number" : "n4", "data": "d4"}]|
+---+---+-------+----------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

版本:

Spark: 2.2
Scala: 2.11
Run Code Online (Sandbox Code Playgroud)

Sha*_*ica 5

这可以通过首先使用to_jsonnumberdata列转换为json格式来完成.然后使用groupBycollect_list上两个ID列,以获得想要的结果.

val df2 = df1.withColumn("json", to_json(struct($"number", $"data")))
  .groupBy("id1", "id2").agg(collect_list($"json"))
Run Code Online (Sandbox Code Playgroud)