mat*_*ieu 2 aggregate-functions user-defined-functions apache-spark apache-spark-sql
我Aggregator[]为Strings 创建了一个自定义.
我想将它应用于DataFrame所有列都是字符串的所有列,但列号是任意的.
我坚持写正确的表达方式.我想写这样的东西:
df.agg( df.columns.map( c => myagg(df(c)) ) : _*)
Run Code Online (Sandbox Code Playgroud)
鉴于各种接口,这显然是错误的.
我看了一下RelationalGroupedDataset.agg(expr: Column, exprs: Column*)代码,但我不熟悉表达式操作.
任何的想法 ?
相比之下UserDefinedAggregateFunctions,对单个字段(列)进行操作时,Aggregtors需要完整Row/值.
如果需要Aggregator,可以在代码段中使用,则必须按列名称进行参数化,并将其Row用作值类型.
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, Row}
case class Max(col: String)
extends Aggregator[Row, Int, Int] with Serializable {
def zero = Int.MinValue
def reduce(acc: Int, x: Row) =
Math.max(acc, Option(x.getAs[Int](col)).getOrElse(zero))
def merge(acc1: Int, acc2: Int) = Math.max(acc1, acc2)
def finish(acc: Int) = acc
def bufferEncoder: Encoder[Int] = Encoders.scalaInt
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
Run Code Online (Sandbox Code Playgroud)
用法示例:
val df = Seq((1, None, 3), (4, Some(5), -6)).toDF("x", "y", "z")
@transient val exprs = df.columns.map(c => Max(c).toColumn.alias(s"max($c)"))
df.agg(exprs.head, exprs.tail: _*)
Run Code Online (Sandbox Code Playgroud)
+------+------+------+
|max(x)|max(y)|max(z)|
+------+------+------+
| 4| 5| 3|
+------+------+------+
Run Code Online (Sandbox Code Playgroud)
可以说,Aggregators在与静态类型相结合,使更多的意义Datasets比Dataset<Row>.
根据您的要求,您还可以使用Seq[_]累加器在单个过程中聚合多个列,并Row在单个merge调用中处理整个(记录).
| 归档时间: |
|
| 查看次数: |
2167 次 |
| 最近记录: |