对spark数据帧的同一列进行多次聚合操作

Ric*_*ker 28 dataframe apache-spark apache-spark-sql

我有三个字符串类型的数组包含以下信息:

  • groupBy数组:包含我想要对数据进行分组的列的名称.
  • aggregate array:包含我想要聚合的列的名称.
  • operations array:包含我想要执行的聚合操作

我正在尝试使用spark数据帧来实现这一目标.Spark数据框提供了agg(),您可以在其中传递Map [String,String](列名和相应的聚合操作)作为输入,但是我想对数据的同一列执行不同的聚合操作.有关如何实现这一目标的任何建议?

zer*_*323 56

斯卡拉:

例如,您可以映射一个函数列表,其中包含mapping从name到函数的定义:

import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column

val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
  "min" -> min, "max" -> max, "mean" -> avg)

val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))

df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// |  k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// |  1|   3.0|   3.0|   3.0|
// |  2|  -5.0|  -5.0|  -5.0|
// +---+------+------+------+
Run Code Online (Sandbox Code Playgroud)

要么

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show
Run Code Online (Sandbox Code Playgroud)

不幸的是,内部使用的解析器SQLContext不会公开暴露,但您总是可以尝试构建纯SQL查询:

df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
  f => s"$f($c) AS ${c}_${f}")
).mkString(",")

sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")
Run Code Online (Sandbox Code Playgroud)

Python:

from pyspark.sql.functions import mean, sum, max, col

df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"] 
funs = [mean, sum, max]

exprs = [f(col(c)) for f in funs for c in aggregate]

# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)
Run Code Online (Sandbox Code Playgroud)


ais*_*ury 7

做类似的事情

from pyspark.sql import functions as F

df.groupBy('groupByColName') \
  .agg(F.sum('col1').alias('col1_sum'),
       F.max('col2').alias('col2_max'),
       F.avg('col2').alias('col2_avg')) \
  .show()
Run Code Online (Sandbox Code Playgroud)


Zep*_*hro 6

对于那些想知道如何在没有 Python 列表理解的情况下编写 @zero323 答案的人:

from pyspark.sql.functions import min, max, col
# init your spark dataframe

expr = [min(col("valueName")),max(col("valueName"))]
df.groupBy("keyName").agg(*expr)
Run Code Online (Sandbox Code Playgroud)