如何将列聚合到 JSON 数组中?

rob*_*ico 3 apache-spark apache-spark-sql

如何转换如下数据以将数据存储在 ElasticSearch 中?

这是一个 bean 的数据集,我将按产品将其聚合到 JSON 数组中。

List<Bean> data = new ArrayList<Bean>();
data.add(new Bean("book","John",59));
data.add(new Bean("book","Björn",61));
data.add(new Bean("tv","Roger",36));
Dataset ds = spark.createDataFrame(data, Bean.class);

ds.show(false);

+------+-------+---------+
|amount|product|purchaser|
+------+-------+---------+
|59    |book   |John     |
|61    |book   |Björn    |
|36    |tv     |Roger    |
+------+-------+---------+


ds = ds.groupBy(col("product")).agg(collect_list(map(ds.col("purchaser"),ds.col("amount")).as("map")));
ds.show(false);

+-------+---------------------------------------------+
|product|collect_list(map(purchaser, amount) AS `map`)|
+-------+---------------------------------------------+
|tv     |[[Roger -> 36]]                              |
|book   |[[John -> 59], [Björn -> 61]]                |
+-------+---------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

这就是我想将其转换为:

+-------+------------------------------------------------------------------+
|product|json                                                              |
+-------+------------------------------------------------------------------+
|tv     |[{purchaser: "Roger", amount:36}]                                 |
|book   |[{purchaser: "John", amount:36}, {purchaser: "Björn", amount:61}] |
+-------+------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

rob*_*ico 7

解决方案 :

ds.groupBy(col("product"))
  .agg(collect_list(to_json(struct(col("purchaser"), col("amount"))).alias("json")));
Run Code Online (Sandbox Code Playgroud)