G_c*_*_cy 8 scala apache-spark
我在println行中收到错误消息SPARK-5063
val d.foreach{x=> for(i<-0 until x.length)
println(m.lookup(x(i)))}
Run Code Online (Sandbox Code Playgroud)
d是 RDD[Array[String]] m RDD[(String, String)].有没有办法以我想要的方式打印?或者我如何将d转换RDD[Array[String]] 为Array[String]?
maa*_*asg 15
SPARK-5063在尝试嵌套RDD操作时涉及更好的错误消息,这是不受支持的.
这是一个可用性问题,而不是功能问题.根本原因是RDD操作的嵌套,解决方案是打破它.
在这里,我们正在尝试加入dRDD和mRDD.如果大小mRDD很大,那么a rdd.join将是推荐的方式,否则,如果mRDD很小,即适合每个执行者的记忆,我们可以收集它,广播它并进行"地图侧"连接.
一个简单的连接将是这样的:
val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6))
val flat = rdd.flatMap(_.toSeq).keyBy(x=>x)
val res = flat.join(map).map{case (k,v) => v}
Run Code Online (Sandbox Code Playgroud)
如果我们想使用广播,我们首先需要在本地收集解析表的值,以便b/c到所有执行者.注意要广播的RDD 必须适合驱动程序以及每个执行程序的内存.
val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6)))
val bcTable = sc.broadcast(map.collectAsMap)
val res2 = rdd.flatMap{arr => arr.map(elem => (elem, bcTable.value(elem)))}
Run Code Online (Sandbox Code Playgroud)