Spark AccumulatorParam通用参数

Joh*_*n S 2 generics scala apache-spark

我在Spark中使用累加器有问题.如Spark网站所示,如果您想要自定义累加器,您可以简单地扩展(使用对象)AccumulatorParam特征.问题是我想但不能使该对象具有通用性,例如:

object SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {

    override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()

    override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2

}
Run Code Online (Sandbox Code Playgroud)

但这给了我一个编译错误,因为对象不能使用泛型参数.我的情况并没有真正允许我SeqAccumulatorParam为每个给定的类型定义一个,因为这会导致很多丑陋的代码重复.

我有一个替代方法,只是将所有结果放在一个RDD,然后用一个累加器迭代它们,为那个单一类型定义,但这会更好.

我的问题是:有没有其他方法来创建累加器?

Shy*_*nki 7

您可以简单地使用类来创建对象,而不是单例对象.

class SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {
    override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()
    override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2
}

val seqAccum = sc.accumulator(Seq[Int]())(new SeqAccumulatorParam[Int]())  

val lists = (1 to 5).map(x => (0 to x).toList)
sc.parallelize(lists).foreach(x => seqAccum += x)

seqAccum.value
// Seq[Int] = List(0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 0, 1, 2, 0, 1)
// result can be in different order.

// For Doubles.
val seqAccumD = sc.accumulator(Seq[Double]())(new SeqAccumulatorParam[Double]())
sc.parallelize(lists.map(x => x.map(_.toDouble))).foreach(x => seqAccumD += x)

seqAccumD.value
// Seq[Double] = List(0.0, 1.0, 0.0, 1.0, 2.0, 0.0, 1.0, 2.0, 3.0, 0.0, 1.0, 2.0, 3.0, 4.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0)
Run Code Online (Sandbox Code Playgroud)