ozi*_*zil 1 scala accumulator apache-spark rdd
我正在尝试使用Apache Spark中的自定义累加器来累加一个集合。结果应具有Set [String]类型。为此,我创建了自定义累加器:
object SetAccumulatorParam extends AccumulatorParam[Set[String]] {
def addInPlace(r1: mutable.Set[String], r2: mutable.Set[String]): mutable.Set[String] = {
r1 ++= r2
}
def zero(initialValue: mutable.Set[String]): mutable.Set[String] = {
Set()
}
}
Run Code Online (Sandbox Code Playgroud)
但是我无法实例化这种类型的变量。
val tags = sc.accumulator(Set(""))(SetAccumulatorParam)
Run Code Online (Sandbox Code Playgroud)
导致错误。请帮助。
required: org.apache.spark.AccumulatorParam[Set[String]]
Run Code Online (Sandbox Code Playgroud)
除Traian的答案外,这是spark 2.x的一般情况SetAccumulator。
import org.apache.spark.util.AccumulatorV2
class SetAccumulator[T](var value: Set[T]) extends AccumulatorV2[T, Set[T]] {
def this() = this(Set.empty[T])
override def isZero: Boolean = value.isEmpty
override def copy(): AccumulatorV2[T, Set[T]] = new SetAccumulator[T](value)
override def reset(): Unit = value = Set.empty[T]
override def add(v: T): Unit = value = value + v
override def merge(other: AccumulatorV2[T, Set[T]]): Unit = value = value ++ other.value
}
Run Code Online (Sandbox Code Playgroud)
您可以像这样使用它:
val accum = new SetAccumulator[String]()
spark.sparkContext.register(accum, "My Accum") // Optional, name it for SparkUI
spark.sparkContext.parallelize(Seq("a", "b", "a", "b", "c")).foreach(s => accum.add(s))
accum.value
Run Code Online (Sandbox Code Playgroud)
哪个输出:
Set[String] = Set(a, b, c)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3702 次 |
| 最近记录: |