小编mwo*_*wol的帖子

Spark Streaming:广播变量,java.lang.ClassCastException


我尝试从存储在HDFS中的静态文本文件中读取数据,将其内容存储到ArrayBuffer中,而ArrayBuffer又应通过sparkContext.broadcast作为BroadcastVariable进行广播.我正在使用cloudera的spark,spark版本1.6.0-cdh5.7.0和spark-streaming_2.10.

我使用spark-submit启动纱线上的应用程序:

spark-submit --class my.package.BroadcastStreamTest1 --master yarn --deploy-mode client --conf spark.executor.userClassPathFirst=true current.jar

当我这样做时,我得到一个 java.lang.ClassCastException:无法将scala.Some的实例分配给org.apache.spark.Accumulator实例中的scala.Option类型的字段org.apache.spark.Accumulable.name 相同的代码与硬编码ArrayBuffer一起使用工作完美,所以我认为它与静态文件资源有关...有谁知道我可能做错了什么?任何帮助赞赏.

这不起作用:


    object BroadcastStreamTest1 {

        def main(args: Array[String]) {
            val sparkConf = new SparkConf()
            val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10))

            val content = streamingContext.sparkContext
                .textFile("hdfs:///data/someTextFile.txt")
                .collect()
                .toBuffer[String]

            val broadCastVar = streamingContext.sparkContext.broadcast(content)
            broadCastVar.value.foreach(line => println(line))

            streamingContext.start()
            streamingContext.awaitTermination()
        }
    }

这有效:


    object BroadcastStreamTest2 {

        def main(args: Array[String]) {
            val sparkConf = new SparkConf()
            val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10))

            val content = …

scala broadcast hdfs apache-spark spark-streaming

7
推荐指数
1
解决办法
1557
查看次数

标签 统计

apache-spark ×1

broadcast ×1

hdfs ×1

scala ×1

spark-streaming ×1