blu*_*sky 6 scala apache-spark
当我调用RDD的map函数时,没有应用.它对于scala.collection.immutable.List的预期工作,但不适用于RDD.以下是一些代码来说明:
val list = List ("a" , "d" , "c" , "d")
list.map(l => {
println("mapping list")
})
val tm = sc.parallelize(list)
tm.map(m => {
println("mapping RDD")
})
Run Code Online (Sandbox Code Playgroud)
上述代码的结果是:
mapping list
mapping list
mapping list
mapping list
Run Code Online (Sandbox Code Playgroud)
但是注意"映射RDD"没有打印到屏幕上.为什么会这样?
这是我试图从RDD填充HashMap的更大问题的一部分:
def getTestMap( dist: RDD[(String)]) = {
var testMap = new java.util.HashMap[String , String]();
dist.map(m => {
println("populating map")
testMap.put(m , m)
})
testMap
}
val testM = getTestMap(tm)
println(testM.get("a"))
Run Code Online (Sandbox Code Playgroud)
此代码打印为null
这是由于懒惰的评价?
懒惰的评估可能是其中的一部分,如果map是您正在执行的唯一操作.在RDD沿袭请求操作(以Spark术语表示)之前,Spark不会安排执行.
当你执行一个动作时,println会发生这种情况,但不会发生在你期望它的驱动程序上,而是在执行该关闭的slave上.尝试查看工人的日志.
hashMap在问题的第二部分,人口也发生了类似的事情.将在每个分区上,在单独的工作程序上执行相同的代码,并将序列化回驱动程序.鉴于Spark已经"清理"了闭包,可能testMap正在从序列化闭包中删除,导致a null.请注意,如果仅由于map未执行,则hashmap应为空,而不是null.
如果要将RDD的数据传输到另一个结构,则需要在驱动程序中执行此操作.因此,您需要强制Spark将所有数据传递给驱动程序.这是功能rdd.collect().
这适用于您的情况.请注意,所有RDD数据都应该适合您的驱动程序内存:
import scala.collection.JavaConverters._
def getTestMap(dist: RDD[(String)]) = dist.collect.map(m => (m , m)).toMap.asJava
Run Code Online (Sandbox Code Playgroud)