blu*_*sky 13 scala apache-spark
sessionIdList的类型是:
scala> sessionIdList res19:org.apache.spark.rdd.RDD [String] = MappedRDD [17] at distinct at:30
当我尝试运行以下代码时:
scala> sessionIdList
res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30
Run Code Online (Sandbox Code Playgroud)
我收到例外:
val x = sc.parallelize(List(1,2,3))
val cartesianComp = x.cartesian(x).map(x => (x))
val kDistanceNeighbourhood = sessionIdList.map(s => {
cartesianComp.filter(v => v != null)
})
kDistanceNeighbourhood.take(1)
Run Code Online (Sandbox Code Playgroud)
但是,如果我使用:
14/05/21 16:20:46 ERROR Executor: Exception in task ID 80
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:38)
at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
Run Code Online (Sandbox Code Playgroud)
然后没有显示异常
两个代码片段之间的区别在于,在第一个片段中,sessionIdList的类型为:
val l = sc.parallelize(List("1","2"))
val kDistanceNeighbourhood = l.map(s => {
cartesianComp.filter(v => v != null)
})
kDistanceNeighbourhood.take(1)
Run Code Online (Sandbox Code Playgroud)
在第二个片段"l"是类型
res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30
Run Code Online (Sandbox Code Playgroud)
为什么会出现这个错误?
我是否需要将sessionIdList转换为ParallelCollectionRDD才能解决此问题?
Jos*_*sen 20
Spark不支持嵌套RDD(请参阅/sf/answers/989137411/以解决同一问题),因此您无法对其他RDD操作中的RDD执行转换或操作.
在第一种情况下,当工作程序尝试访问仅存在于驱动程序而不是工作程序上的SparkContext对象时,您会看到工作程序抛出NullPointerException.
在第二种情况下,我的预感是在驱动程序上本地运行的工作并且纯粹是偶然地工作.
小智 9
这是一个合理的问题,我听到它有足够的时间问过这个问题.我将尝试解释为什么这是真的,因为它可能会有所帮助.
嵌套的RDD将始终在生产中引发异常.嵌套函数调用,因为我认为你在这里描述它们,如果它意味着在RDD操作中调用RDD操作,也会导致失败,因为它实际上是同样的事情.(RDD是不可变的,因此执行诸如"map"的RDD操作等同于创建新的RDD.)创建嵌套RDD的能力是定义RDD的方式和Spark应用程序的方式的必然结果.建立.
RDD是Spark Executors上的对象(称为分区)的分布式集合.Spark执行器无法与Spark驱动程序相互通信.RDD操作都是在这些分区上分段计算的.因为RDD的执行器环境不是递归的(即你可以将Spark驱动程序配置为带有子执行程序的spark执行器),RDD也不能.
在您的程序中,您已创建了一个分布式整数分区集合.然后,您正在执行映射操作.当Spark驱动程序看到映射操作时,它会发送指令以执行映射到执行程序,执行程序并行执行每个分区的转换.但是你的映射无法完成,因为在每个分区上你都试图调用"整个RDD"来执行另一个分布式操作.这是不可能的,因为每个分区都无法访问其他分区上的信息,如果确实如此,则计算无法并行运行.
您可以做什么,因为您在地图中需要的数据可能很小(因为您正在进行过滤,并且过滤器不需要任何有关sessionIdList的信息)是首先过滤会话ID列表.然后将该列表收集到驱动程序.然后将其广播到执行程序,您可以在地图中使用它.如果sessionID列表太大,您可能需要进行连接.