如何创建自定义集合累加器,即Set [String]?

ozi*_*zil 1 scala accumulator apache-spark rdd

我正在尝试使用Apache Spark中的自定义累加器来累加一个集合。结果应具有Set [String]类型。为此,我创建了自定义累加器:

object SetAccumulatorParam extends AccumulatorParam[Set[String]] {
    def addInPlace(r1: mutable.Set[String], r2: mutable.Set[String]): mutable.Set[String] = {
        r1 ++= r2
    }

    def zero(initialValue: mutable.Set[String]): mutable.Set[String] = {
        Set()
    }
}
Run Code Online (Sandbox Code Playgroud)

但是我无法实例化这种类型的变量。

val tags = sc.accumulator(Set(""))(SetAccumulatorParam)
Run Code Online (Sandbox Code Playgroud)

导致错误。请帮助。

required: org.apache.spark.AccumulatorParam[Set[String]]
Run Code Online (Sandbox Code Playgroud)

Rya*_*ier 5

除Traian的答案外,这是spark 2.x的一般情况SetAccumulator。

import org.apache.spark.util.AccumulatorV2

class SetAccumulator[T](var value: Set[T]) extends AccumulatorV2[T, Set[T]] {
  def this() = this(Set.empty[T])
  override def isZero: Boolean = value.isEmpty
  override def copy(): AccumulatorV2[T, Set[T]] = new SetAccumulator[T](value)
  override def reset(): Unit = value = Set.empty[T]
  override def add(v: T): Unit = value = value + v
  override def merge(other: AccumulatorV2[T, Set[T]]): Unit = value = value ++ other.value
}
Run Code Online (Sandbox Code Playgroud)

您可以像这样使用它:

val accum = new SetAccumulator[String]()
spark.sparkContext.register(accum, "My Accum") // Optional, name it for SparkUI

spark.sparkContext.parallelize(Seq("a", "b", "a", "b", "c")).foreach(s => accum.add(s))

accum.value
Run Code Online (Sandbox Code Playgroud)

哪个输出:

Set[String] = Set(a, b, c)
Run Code Online (Sandbox Code Playgroud)