mar*_*ius 3 scala apache-spark apache-spark-sql apache-spark-dataset
使用Scala Spark,如何使用类型化的数据集API舍入聚合列?
另外,如何通过groupby操作保留数据集的类型?
这是我目前拥有的:
case class MyRow(
k1: String,
k2: String,
c1: Double,
c2: Double
)
def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = {
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(row => (row.k1, row.k2))
.agg(
avg(_.c1),
avg(_.c2)
)
.map(r => MyRow(r._1._1, r._1._2, r._2, r._3))
}
Run Code Online (Sandbox Code Playgroud)
avg(_.c1),则会round(avg(_.c1))出现类型错误。四舍五入的正确方法是什么?.map(...)行感觉不对-有没有更优雅的方法来保留我的数据集的类型?谢谢!
虽然可接受的答案有效且更笼统,但在这种情况下,您也可以使用回合。您只需要在四舍五入后使用来键入列.as[T](也需要将类型定义为avg)。
.agg(
// Alternative ways to define a type to avg
round(avg((r: MyRow) => r.c1)).as[Double],
round(avg[MyRow](_.c2)).as[Double]
)
Run Code Online (Sandbox Code Playgroud)
round在类型错误上使用确实失败,因为agg期望类型TypedColumn[IN, OUT]和轮次的聚合函数提供Column(适用于数据帧)。
这里你需要的是一个舍入平均聚合函数,它没有提供org.apache.spark.sql.expressions.scalalang.typed._- 但你可以通过扩展执行平均聚合的类轻松地自己创建一个:
// Extend TypedAverage - round the result before returning it
class TypedRoundAverage[IN](f: IN => Double) extends TypedAverage[IN](f) {
override def finish(reduction: (Double, Long)): Double = math.round(super.finish(reduction))
}
// A nice wrapper to create the TypedRoundAverage for a given function
def roundAvg[IN](f: IN => Double): TypedColumn[IN, Double] = new TypedRoundAverage(f).toColumn
// Now you can use "roundAvg" instead of "round"
def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = {
ds.groupByKey(row => (row.k1, row.k2))
.agg(
roundAvg(_.c1),
roundAvg(_.c2)
)
.map { case ((k1, k2), c1, c2) => MyRow(k1, k2, c1, c2) } // just a nicer way to put it
}
Run Code Online (Sandbox Code Playgroud)
我看不到摆脱map操作的方法,因为 group-by 必然返回一个元组,但可以使用模式匹配使其更好一些