Spark 流异常:java.util.NoSuchElementException:None.get

and*_*ani 6 hadoop apache-kafka apache-spark spark-streaming apache-spark-sql

SparkStreaming通过将数据转换为数据帧来将数据写入 HDFS:

代码

object KafkaSparkHdfs {

  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
  sparkConf.set("spark.driver.allowMultipleContexts", "true");
  val sc = new SparkContext(sparkConf)

  def main(args: Array[String]): Unit = {
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val ssc = new StreamingContext(sparkConf, Seconds(20))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "stream3",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("fridaydata")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
    )

    val lines = stream.map(consumerRecord => consumerRecord.value)
    val words = lines.flatMap(_.split(" "))
    val wordMap = words.map(word => (word, 1))
    val wordCount = wordMap.reduceByKey(_ + _)

    wordCount.foreachRDD(rdd => {
      val dataframe = rdd.toDF(); 
      dataframe.write
        .mode(SaveMode.Append)
        .save("hdfs://localhost:9000/newfile24")     
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
Run Code Online (Sandbox Code Playgroud)

文件夹已创建,但文件未写入。

该程序因以下错误而终止:

    18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
    at java.lang.Thread.run(Thread.java:748)
    18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Run Code Online (Sandbox Code Playgroud)

在我的 pom 中,我使用各自的依赖项:

  • 火花核心_2.11
  • Spark-SQL_2.11
  • 火花流_2.11
  • 火花流-kafka-0-10_2.11

Sha*_*ica 3

该错误是由于尝试同时运行多个 Spark 上下文造成的。设置allowMultipleContexts为 true 主要用于测试目的,不鼓励使用。因此,解决您问题的方法是确保SparkContext在任何地方都使用相同的方法。从代码中我们可以看出,SparkContext( sc)是用来创建一个的SQLContext,这样就可以了。但是,创建时StreamingContext不使用它,而是SparkConf使用。

通过查看文档我们看到:

通过提供新 SparkContext 所需的配置来创建 StreamingContext

换句话说,通过使用SparkConf作为参数,SparkContext将创建一个新的。现在有两个不同的上下文。

这里最简单的解决方案是继续使用与以前相同的上下文。将创建的行更改为StreamingContext

val ssc = new StreamingContext(sc, Seconds(20))
Run Code Online (Sandbox Code Playgroud)

注意:在较新版本的 Spark (2.0+) 中请SparkSession改为使用。然后可以使用创建新的流上下文StreamingContext(spark.sparkContext, ...)。它可以如下所示:

val spark = SparkSession().builder
  .setMaster("local[*]")
  .setAppName("SparkKafka")
  .getOrCreate()

import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
Run Code Online (Sandbox Code Playgroud)