我尝试从存储在HDFS中的静态文本文件中读取数据,将其内容存储到ArrayBuffer中,而ArrayBuffer又应通过sparkContext.broadcast作为BroadcastVariable进行广播.我正在使用cloudera的spark,spark版本1.6.0-cdh5.7.0和spark-streaming_2.10.
我使用spark-submit启动纱线上的应用程序:
spark-submit --class my.package.BroadcastStreamTest1 --master yarn --deploy-mode client --conf spark.executor.userClassPathFirst=true current.jar
当我这样做时,我得到一个 java.lang.ClassCastException:无法将scala.Some的实例分配给org.apache.spark.Accumulator实例中的scala.Option类型的字段org.apache.spark.Accumulable.name 相同的代码与硬编码ArrayBuffer一起使用工作完美,所以我认为它与静态文件资源有关...有谁知道我可能做错了什么?任何帮助赞赏.
object BroadcastStreamTest1 {
def main(args: Array[String]) {
val sparkConf = new SparkConf()
val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10))
val content = streamingContext.sparkContext
.textFile("hdfs:///data/someTextFile.txt")
.collect()
.toBuffer[String]
val broadCastVar = streamingContext.sparkContext.broadcast(content)
broadCastVar.value.foreach(line => println(line))
streamingContext.start()
streamingContext.awaitTermination()
}
}
object BroadcastStreamTest2 {
def main(args: Array[String]) {
val sparkConf = new SparkConf()
val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10))
val content = …