Spark:将输出HashSet保存到文件中

Eda*_*ame -1 scala apache-spark

我有以下代码:

val mySet: HashSet[String] = HashSet[String]()
val mySetBroadcastVar = sc.broadcast(mySet)

val output = input.map { t =>

  if (t.getA()!= null) {
    stSetBroadcastVar.value.add(t.getA())
  }
}.count()

sc.parallelize(myBroadcastVar.value.toList, 1).saveAsTextFile("mySetValues")
Run Code Online (Sandbox Code Playgroud)

然后文件mySetValues总是为空,即使它不应该是.这是因为mySetValues在计算输出之前保存了吗?我该如何解决这个问题?谢谢!

Ara*_*ram 5

  1. 广播变量以有效的方式跨任务和阶段共享只读数据
  2. 任务不应该修改广播变量,因为更新不会反映在其他节点中,也不会传输回驱动程序.
  3. 为此需要累加器.

示例(来自spark-shell)

scala> val acc = sc.accumulableCollection(scala.collection.mutable.HashSet[String]())
acc: org.apache.spark.Accumulable[scala.collection.mutable.HashSet[String],String] = Set()

scala> val names=sc.parallelize(Seq("aravind","sam","kenny","apple"))
names: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[86] at parallelize at <console>:22

scala> names.foreach( x => if(x.startsWith("a")) acc += x )

scala> acc
res27: org.apache.spark.Accumulable[scala.collection.mutable.HashSet[String],String] = Set(apple, aravind)

scala>
Run Code Online (Sandbox Code Playgroud)