如何在火花中处理错误SPARK-5063

G_c*_*_cy 8 scala apache-spark

我在println行中收到错误消息SPARK-5063

val d.foreach{x=> for(i<-0 until x.length)
      println(m.lookup(x(i)))}    
Run Code Online (Sandbox Code Playgroud)

d是 RDD[Array[String]] m RDD[(String, String)].有没有办法以我想要的方式打印?或者我如何将d转换RDD[Array[String]]Array[String]

maa*_*asg 15

SPARK-5063在尝试嵌套RDD操作时涉及更好的错误消息,这是不受支持的.

这是一个可用性问题,而不是功能问题.根本原因是RDD操作的嵌套,解决方案是打破它.

在这里,我们正在尝试加入dRDDmRDD.如果大小mRDD很大,那么a rdd.join将是推荐的方式,否则,如果mRDD很小,即适合每个执行者的记忆,我们可以收集它,广播它并进行"地图侧"连接.

加入

一个简单的连接将是这样的:

val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6))
val flat = rdd.flatMap(_.toSeq).keyBy(x=>x)
val res = flat.join(map).map{case (k,v) => v}
Run Code Online (Sandbox Code Playgroud)

如果我们想使用广播,我们首先需要在本地收集解析表的值,以便b/c到所有执行者.注意要广播的RDD 必须适合驱动程序以及每个执行程序的内存.

带有Broadcast变量的Map-side JOIN

val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6)))
val bcTable = sc.broadcast(map.collectAsMap)
val res2 = rdd.flatMap{arr => arr.map(elem => (elem, bcTable.value(elem)))} 
Run Code Online (Sandbox Code Playgroud)