使用Spark中的自定义函数聚合多个列

ant*_*ell 32 scala dataframe apache-spark apache-spark-sql orc

我想知道是否有某种方法可以为多列上的spark数据帧指定自定义聚合函数.

我有一个类似这样的表(名称,项目,价格):

john | tomato | 1.99
john | carrot | 0.45
bill | apple  | 0.99
john | banana | 1.29
bill | taco   | 2.59
Run Code Online (Sandbox Code Playgroud)

至:

我想将每个人的项目和成本汇总到这样的列表中:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)
Run Code Online (Sandbox Code Playgroud)

这在数据帧中是否可行?我最近了解到collect_list它,但它似乎只适用于一个专栏.

小智 65

struct在收集列表之前,请考虑使用该函数将列分组在一起:

import org.apache.spark.sql.functions.{collect_list, struct}
import sqlContext.implicits._

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

df.groupBy($"name")
  .agg(collect_list(struct($"food", $"price")).as("foods"))
  .show(false)
Run Code Online (Sandbox Code Playgroud)

输出:

+----+---------------------------------------------+
|name|foods                                        |
+----+---------------------------------------------+
|john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
|bill|[[apple,0.99], [taco,2.59]]                  |
+----+---------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

  • 我想提一下,这种方法看起来比接受的答案更清晰,但遗憾的是不适用于spark 1.6,因为`collect_list()`不接受结构. (3认同)
  • 在Spark 2.1中工作 (3认同)

Dav*_*fin 33

最简单的方法来做到这一点的DataFrame,是先收集两个列表,然后使用UDFzip两个列表在一起.就像是:

import org.apache.spark.sql.functions.{collect_list, udf}
import sqlContext.implicits._

val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

val df2 = df.groupBy("name").agg(
  collect_list(col("food")) as "food",
  collect_list(col("price")) as "price" 
).withColumn("food", zipper(col("food"), col("price"))).drop("price")

df2.show(false)
# +----+---------------------------------------------+
# |name|food                                         |
# +----+---------------------------------------------+
# |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
# |bill|[[apple,0.99], [taco,2.59]]                  |
# +----+---------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

  • 答案假定(可能正确)collect_list()将保留食物和价格两列上元素的顺序.意味着来自同一行的食物和价格将在两个收集的列表中以相同的索引结束.此订单保留行为是否得到保证?(这是有道理的,但我不确定通过查看collect_list的scala代码,而不是scala程序员). (6认同)
  • Afaik,不能保证元素的顺序相同。cf:https://stackoverflow.com/questions/40407514/use-more-than-one-collect-list-in-one-query-in-spark-sql (2认同)

小智 6

就您的观点而言,collect_list似乎仅适用于一列:要使collect_list适用于多列,您必须将所需的列包装在一个结构中。例如:

     val aggregatedData = df.groupBy("name").agg(collect_list(struct("item", "price")) as("food"))

     aggregatedData.show
+----+------------------------------------------------+
|name|foods                                           |
+----+------------------------------------------------+
|john|[[tomato, 1.99], [carrot, 0.45], [banana, 1.29]]|
|bill|[[apple, 0.99], [taco, 2.59]]                   |
+----+------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)


小智 5

也许比zip函数更好的方法(因为UDF和UDAF对性能不利)是将两列包装到Struct

这可能也可以工作:

df.select('name, struct('food, 'price).as("tuple"))
  .groupBy('name)
  .agg(collect_list('tuple).as("tuples"))
Run Code Online (Sandbox Code Playgroud)