spark aggregation for array column

Ole*_*liv 3 scala aggregate-functions apache-spark apache-spark-sql

I have a dataframe with a array column.

val json = """[
{"id": 1, "value": [11, 12, 18]},
{"id": 2, "value": [23, 21, 29]}
]"""

val df = spark.read.json(Seq(json).toDS)

scala> df.show
+---+------------+
| id|       value|
+---+------------+
|  1|[11, 12, 18]|
|  2|[23, 21, 29]|
+---+------------+
Run Code Online (Sandbox Code Playgroud)

Now I need to apply different aggregate functions to the value column. I can call explode and groupBy, for example

df.select($"id", explode($"value").as("value")).groupBy($"id").agg(max("value"), avg("value")).show

+---+----------+------------------+
| id|max(value)|        avg(value)|
+---+----------+------------------+
|  1|        18|13.666666666666666|
|  2|        29|24.333333333333332|
+---+----------+------------------+
Run Code Online (Sandbox Code Playgroud)

What bothers me here is that I explode my DataFrame into a bigger one and then reduce it to the original calling groupBy.

Is there a better (i.e. more efficient) way to call aggregated functions on array column? Probably I can implement UDF but I don't want to implement all aggregation UDFs myself.

EDIT. Someone referenced this SO question but it doesn't work in my case. The size is working fine

scala> df.select($"id", size($"value")).show
+---+-----------+
| id|size(value)|
+---+-----------+
|  1|          3|
|  2|          3|
+---+-----------+
Run Code Online (Sandbox Code Playgroud)

But avg or max do not work.

Wad*_*sen 6

简短的答案是“否”,您必须实现自己的UDF才能在数组列上进行聚合。至少在最新版本的Spark(撰写本文时为2.3.1)中。您正确断言的方法效率不高,因为它会迫使您爆炸行或支付在Dataset API中工作的序列化和反序列化成本。

对于可能会发现此问题的其他人,要使用数据集以类型安全的方式编写聚合,则可以使用Aggregator API,该API显然没有得到很好的记录,并且由于类型签名变得非常冗长,因此使用起来非常混乱。

较长的答案是,此功能即将很快(?)中的Apache星火2.4。

父问题SPARK-23899添加:

  • array_max
  • array_min
  • 骨料
  • 地图
  • array_distinct
  • array_remove
  • array_join

还有很多其他

易于使用数组类型操作扩展Spark SQL API的屏幕截图幻灯片11

本讲座“ 扩展星火SQL API与更易于使用数组类型的操作 ”,提出在2018年6月星火+ AI峰会涵盖了新的功能。

如果发布了该版本,则可以max像示例中那样使用该函数,但是average会有些棘手。奇怪的是,不存在array_sum,但是可以从aggregate函数中构建它。它可能看起来像:

def sum_array(array_col: Column) = aggregate($"my_array_col", 0, (s, x) => s + x, s => s) df.select(sum_array($"my_array_col") 其中零值是聚合缓冲区的初始状态。

如您所指出的,size已经可以获取数组的长度,这意味着可以计算平均值。