Dhi*_*raj 6 scala apache-spark rdd
我试图在Scala shell(驱动程序)中定义String类型的累加器变量,但我不断收到以下错误: -
scala> val myacc = sc.accumulator("Test")
<console>:21: error: could not find implicit value for parameter param: org.apache.spark.AccumulatorParam[String]
val myacc = sc.accumulator("Test")
^
Run Code Online (Sandbox Code Playgroud)
对于Int或Double类型的累加器来说,这似乎没有问题.
谢谢
zer*_*323 12
这是因为Spark默认只提供类型的累加器Long,Double和Float.如果你需要别的东西,你必须扩展AccumulatorParam.
import org.apache.spark.AccumulatorParam
object StringAccumulatorParam extends AccumulatorParam[String] {
def zero(initialValue: String): String = {
""
}
def addInPlace(s1: String, s2: String): String = {
s"$s1 $s2"
}
}
val stringAccum = sc.accumulator("")(StringAccumulatorParam)
val rdd = sc.parallelize("foo" :: "bar" :: Nil, 2)
rdd.foreach(s => stringAccum += s)
stringAccum.value
Run Code Online (Sandbox Code Playgroud)
注意:
通常,您应该避免将累加器用于数据可能随时间显着增长的任务.它的行为类似于groupa collect,在最坏的情况下,由于缺乏资源,情况可能会失败.累加器主要用于简单的诊断任务,如跟踪基本统计数据.