Jay*_*thi 5 scala apache-spark apache-spark-dataset
我有一个自定义Aggregator,它执行count-min草图聚合.它有效,但速度很慢(下面的代码).如果我使用基于UserDefinedAggregateFunction类的自定义UDAF,我会得到类似的慢性能.
如果我使用Dataset mapPartitionsAPI在分区内聚合然后跨分区减少,这会快得多.
问题 - UDAF和AggregatorAPI 的缓慢似乎是由每行发生的序列化/反序列化(编码)引起的.UDAF和Aggregator API是不是用于聚合成非平凡的数据结构,如count-min草图?mapPartitions是否是处理此问题的最佳方法?
Aggregator 示例代码:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Row, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.util.sketch.CountMinSketch
object AggTest extends App {
val input = "2008.csv"
val conf = new SparkConf().setMaster("local[4]").setAppName("tester")
val sqlContext = SparkSession.builder().config(conf).getOrCreate().sqlContext
val df = sqlContext.read.format("csv").option("header", "true").load(input)
implicit val sketchEncoder = org.apache.spark.sql.Encoders.kryo[CountMinSketch]
case class SketchAgg(col: String) extends Aggregator[Row, CountMinSketch, CountMinSketch] {
def zero: CountMinSketch = CountMinSketch.create(500, 4, 2429)
def reduce(sketch: CountMinSketch, row: Row) = {
val a = row.getAs[Any](col)
sketch.add(a)
sketch
}
def merge(sketch1: CountMinSketch, sketch2: CountMinSketch) = {
sketch1.mergeInPlace(sketch2)
}
def finish(sketch: CountMinSketch) = sketch
def bufferEncoder: Encoder[CountMinSketch] = sketchEncoder
def outputEncoder: Encoder[CountMinSketch] = sketchEncoder
}
val sketch = df.agg(SketchAgg("ArrDelay")
.toColumn
.alias("sketch"))
.select("sketch")
.as[CountMinSketch]
.first()
}
Run Code Online (Sandbox Code Playgroud)