Joh*_*n S 2 generics scala apache-spark
我在Spark中使用累加器有问题.如Spark网站所示,如果您想要自定义累加器,您可以简单地扩展(使用对象)AccumulatorParam特征.问题是我想但不能使该对象具有通用性,例如:
object SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {
override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()
override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2
}
Run Code Online (Sandbox Code Playgroud)
但这给了我一个编译错误,因为对象不能使用泛型参数.我的情况并没有真正允许我SeqAccumulatorParam为每个给定的类型定义一个,因为这会导致很多丑陋的代码重复.
我有一个替代方法,只是将所有结果放在一个RDD,然后用一个累加器迭代它们,为那个单一类型定义,但这会更好.
我的问题是:有没有其他方法来创建累加器?
您可以简单地使用类来创建对象,而不是单例对象.
class SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {
override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()
override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2
}
val seqAccum = sc.accumulator(Seq[Int]())(new SeqAccumulatorParam[Int]())
val lists = (1 to 5).map(x => (0 to x).toList)
sc.parallelize(lists).foreach(x => seqAccum += x)
seqAccum.value
// Seq[Int] = List(0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 0, 1, 2, 0, 1)
// result can be in different order.
// For Doubles.
val seqAccumD = sc.accumulator(Seq[Double]())(new SeqAccumulatorParam[Double]())
sc.parallelize(lists.map(x => x.map(_.toDouble))).foreach(x => seqAccumD += x)
seqAccumD.value
// Seq[Double] = List(0.0, 1.0, 0.0, 1.0, 2.0, 0.0, 1.0, 2.0, 3.0, 0.0, 1.0, 2.0, 3.0, 4.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1292 次 |
| 最近记录: |