Scala-Spark使用参数值动态调用groupby和agg

Neh*_*haM 4 customization scala group-by aggregate apache-spark

我想编写自定义分组和聚合函数来获取用户指定的列名和用户指定的聚合映射.我不知道前面的列名和聚合映射.我想写一个类似下面的函数.但我是Scala的新手,我无法解决它.

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
  val grouped = df.groupBy(cols)
  val aggregated = grouped.agg(aggregateFun)
  aggregated.show()
}
Run Code Online (Sandbox Code Playgroud)

并希望称之为

val listOfStrings =  List("A", "B", "C")
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings)
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?任何人都可以帮助我.

Tza*_*har 8

您的代码几乎是正确的 - 有两个问题:

  1. 函数的返回类型是DataFrame,但最后一行是aggregated.show()返回的Unit.删除show要返回的调用aggregated,或者只是agg立即返回结果

  2. DataFrame.groupBy期望参数如下:col1: String, cols: String*- 所以你需要传递匹配的参数:第一列,然后是其余的列作为参数列表,你可以这样做:df.groupBy(cols.head, cols.tail: _*)

总而言之,您的功能将是:

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
  val grouped = df.groupBy(cols.head, cols.tail: _*)
  val aggregated = grouped.agg(aggregateFun)
  aggregated
}
Run Code Online (Sandbox Code Playgroud)

或者,类似的较短版本:

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame = {
  df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}
Run Code Online (Sandbox Code Playgroud)

如果你想打电话给show你的函数中:

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
  val grouped = df.groupBy(cols.head, cols.tail: _*)
  val aggregated = grouped.agg(aggregateFun)
  aggregated.show()
  aggregated
}
Run Code Online (Sandbox Code Playgroud)