我通过广播共享的变量在集群中为空.
我的应用程序非常复杂,但我编写了这个小例子,当我在本地运行它时,它运行完美,但它在集群中失败了:
package com.gonzalopezzi.bigdata.bicing
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
object PruebaBroadcast2 extends App {
val conf = new SparkConf().setAppName("PruebaBroadcast2")
val sc = new SparkContext(conf)
val arr : Array[Int] = (6 to 9).toArray
val broadcasted = sc.broadcast(arr)
val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) // a small integer array [1, 2, 3, 4] is paralellized in two machines
rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println) // NullPointerException in the flatmap. broadcasted is null
}
Run Code Online (Sandbox Code Playgroud)
我不知道问题是编码错误还是配置问题.
这是我得到的堆栈跟踪:
15/07/07 20:55:13 …Run Code Online (Sandbox Code Playgroud) 我有一个Spark 1.4.0项目,我正在尝试解析包含时间戳字段的几个JSON记录,并使用Jackson和JSR-310模块将其存储在ZonedDateTime对象中.如果我尝试从IDE(即,IntelliJ IDEA的14.0),它运行正常运行的驱动程序,但如果我用SBT装配和,然后我得到以下异常:spark-submit
15/07/16 14:13:03 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.lang.AbstractMethodError: com.mycompany.input.EventParser$$anonfun$1$$anon$1.com$fasterxml$jackson$module$scala$experimental$ScalaObjectMapper$_setter_$com$fasterxml$jackson$module$scala$experimental$ScalaObjectMapper$$typeCache_$eq(Lorg/spark-project/guava/cache/LoadingCache;)V
at com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper$class.$init$(ScalaObjectMapper.scala:50)
at com.mycompany.input.EventParser$$anonfun$1$$anon$1.<init>(EventParser.scala:27)
at com.mycompany.input.EventParser$$anonfun$1.apply(EventParser.scala:27)
at com.mycompany.input.EventParser$$anonfun$1.apply(EventParser.scala:24)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我已经尝试了几个版本的装配,杰克逊和火花,但没有运气.我想这与某种程度上与spark和我的项目之间的依赖冲突有关(不知何故,与Guava库有关).有任何想法吗?
谢谢!
编辑:这里重现问题的示例项目.