使用ArrayType的Spark UDAF作为bufferSchema性能问题

Lir*_*nBo 3 performance scala user-defined-functions apache-spark apache-spark-sql

我正在研究一个返回元素数组的UDAF.

每次更新的输入都是索引和值的元组.

UDAF的作用是对同一索引下的所有值求和.

例:

对于输入(索引,值):( 2,1),(3,1),(2,3)

应该返回(0,0,4,1,...,0)

逻辑工作正常,但我有更新方法的问题,我的实现只更新每行1个单元格,但该方法中的最后一个赋值实际上复制整个数组 - 这是多余的,非常耗时.

仅此分配负责我的查询执行时间的98%.

我的问题是,我怎样才能缩短那段时间?是否可以在缓冲区数组中分配1个值而无需替换整个缓冲区?

PS:我正在使用Spark 1.6,我不能很快升级它,所以请坚持使用适用于此版本的解决方案.

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{

  val bucketSize = 1000

  def inputSchema: StructType =  StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

  def dataType: DataType = ArrayType(LongType)

  def deterministic: Boolean = true

  def bufferSchema: StructType = {
    StructType(
      StructField("buckets", ArrayType(LongType)) :: Nil  
    )
  }

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new Array[Long](bucketSize)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val index = input.getLong(0)
    val value = input.getLong(1)

    val arr = buffer.getAs[mutable.WrappedArray[Long]](0)

    buffer(0) = arr   // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
  }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
    val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)

    for(i <- arr1.indices){
      arr1.update(i, arr1(i) + arr2(i))
    }

    buffer1(0) = arr1
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[mutable.WrappedArray[Long]](0)
  }
}
Run Code Online (Sandbox Code Playgroud)

use*_*411 9

TL; DR要么不使用UDAF,要么使用原始类型代替ArrayType.

没有 UserDefinedFunction

两种解决方案都应该避免内部和外部表示之间的昂贵杂耍.

使用标准聚合和 pivot

这使用标准SQL聚合.虽然在内部进行了优化,但是当键的数量和阵列的大小增加时,它可能会很昂贵.

给定输入:

val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")
Run Code Online (Sandbox Code Playgroud)

您可以:

import org.apache.spark.sql.functions.{array, coalesce, col, lit}

val nBuckets = 10
@transient val values = array(
  0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)

df
  .groupBy("id")
  .pivot("index", 0 until nBuckets)
  .sum("value")
  .select($"id", values.alias("values"))
Run Code Online (Sandbox Code Playgroud)
+---+--------------------+                                                      
| id|              values|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
Run Code Online (Sandbox Code Playgroud)

使用带combineByKey/的RDD API aggregateByKey.

byKey具有可变缓冲区的普通旧聚合.没有花里胡哨,但是应该在广泛的输入范围内表现得相当好.如果您怀疑输入是稀疏的,您可以考虑更有效的中间表示,如可变Map.

rdd
  .aggregateByKey(Array.fill(nBuckets)(0L))(
    { case (acc, (index, value)) => { acc(index) += value; acc }},
    (acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
  ).toDF
Run Code Online (Sandbox Code Playgroud)
+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
Run Code Online (Sandbox Code Playgroud)

使用UserDefinedFunction原始类型

据我了解内部情况,性能瓶颈是ArrayConverter.toCatalystImpl.

它看起来像它被称为每个呼叫MutableAggregationBuffer.update,并依次分配新的GenericArrayData每一个Row.

如果我们重新定义bufferSchema为:

def bufferSchema: StructType = {
  StructType(
    0 to nBuckets map (i => StructField(s"x$i", LongType))
  )
}
Run Code Online (Sandbox Code Playgroud)

二者updatemerge可以表示为在缓冲器中的原始值的平原替换.呼叫链将保持相当长的时间,但不需要复制/转换和疯狂分配.省略null检查你需要类似的东西

val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))
Run Code Online (Sandbox Code Playgroud)

for(i <- 0 to nBuckets){
  buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}
Run Code Online (Sandbox Code Playgroud)

分别.

最后evaluate应该Row把它转换成输出Seq:

 for (i <- 0 to nBuckets)  yield buffer.getLong(i)
Run Code Online (Sandbox Code Playgroud)

请注意,在此实现中可能存在瓶颈merge.虽然它不应该引入任何新的性能问题,使用M桶,每次调用merge都是O(M).

使用K个唯一密钥和P分区,在最坏的情况下,它将被称为M*K次,其中每个密钥在每个分区上至少出现一次.这有效地增加了merge组分的共聚度为O(M*N*K).

一般来说,你无能为力.但是,如果您对数据分布做出具体假设(数据稀疏,密钥分配是统一的),您可以稍微缩短一下,然后先进行随机播放:

df
  .repartition(n, $"key")
  .groupBy($"key")
  .agg(SumArrayAtIndexUDAF($"index", $"value"))
Run Code Online (Sandbox Code Playgroud)

如果满足假设,它应该:

  • 通过改组稀疏对而不是像密集阵列一样,违反直觉地减少了随机大小Rows.
  • 仅使用更新来聚合数据(每个O(1))可能仅作为索引的子集接触.

但是,如果不满足一个或两个假设,您可以预期随机播放的大小将增加,而更新的数量将保持不变.同时数据倾斜可以使事情比甚至更糟update- shuffle- merge场景.

使用Aggregator "强烈"键入Dataset:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}

class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int)  extends Aggregator[I, Array[Long], Seq[Long]]
    with Serializable {
  def zero = Array.fill(bucketSize)(0L)
  def reduce(acc: Array[Long], x: I) = {
    val (i, v) = f(x)
    acc(i) += v
    acc
  }

  def merge(acc1: Array[Long], acc2: Array[Long]) = {
    for {
      i <- 0 until bucketSize
    } acc1(i) += acc2(i)
    acc1
  }

  def finish(acc: Array[Long]) = acc.toSeq

  def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
  def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}
Run Code Online (Sandbox Code Playgroud)

可以如下所示使用

val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS

ds
  .groupByKey(_._1)
  .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
  .show(false)
Run Code Online (Sandbox Code Playgroud)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2)  |
+-----+-------------------------------+
|1    |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2    |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+
Run Code Online (Sandbox Code Playgroud)