serializable对象的用法:引起:java.io.NotSerializableException

duc*_*ito 6 scala apache-spark

我按照本教程和其他有关任务序列化的类似教程,但我的代码失败并出现Task serialization错误.我不明白为什么会这样.我在topicOutputMessages外面设置变量foreachRDD,然后我在里面读它foreachPartition.我也创建了KafkaProducerINSIDE foreachPartition.那么,这里的问题是什么?真的不能说明问题.

al topicsSet = topicInputMessages.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> metadataBrokerList_InputQueue)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)


messages.foreachRDD(rdd => {
    rdd.foreachPartition{iter =>
        UtilsDM.setMetadataBrokerList(metadataBrokerList)
        UtilsDM.setOutputTopic(topicOutputMessages)
        val producer = UtilsDM.createProducer
        iter.foreach { msg =>
              val record = new ProducerRecord[String, String](UtilsDM.getOutputTopic(), msg)
              producer.send(record)
        }
        producer.close()
    }
})
Run Code Online (Sandbox Code Playgroud)

编辑:

object UtilsDM extends Serializable {

  var topicOutputMessages: String = ""
  var metadataBrokerList: String = ""
  var producer: KafkaProducer[String, String] = null

  def setOutputTopic(t: String): Unit = {
    topicOutputMessages = t
  }

  def setMetadataBrokerList(m: String): Unit = {
    metadataBrokerList = m
  }

 def createProducer: KafkaProducer[String, String] = {

    val  kafkaProps = new Properties()

    kafkaProps.put("bootstrap.servers", metadataBrokerList)

    // This is mandatory, even though we don't send key
    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("acks", "1")

    // how many times to retry when produce request fails?
    kafkaProps.put("retries", "3")
    // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes)
    kafkaProps.put("batch.size", "5")
    // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch
    kafkaProps.put("linger.ms", "5")

    new KafkaProducer[String, String](kafkaProps)
  }

}
Run Code Online (Sandbox Code Playgroud)

完整的堆栈跟踪:

16/11/21 13:47:30 ERROR JobScheduler: Error running job streaming job 1479732450000 ms.0
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer
Serialization stack:
    - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more
16/11/21 13:47:30 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer
Serialization stack:
    - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more
Run Code Online (Sandbox Code Playgroud)

Nie*_*and 1

我认为问题出在你的UtilsDM班级上。它被闭包捕获,Spark 尝试序列化代码以将其发送给执行器。

尝试使 UtilsDM 可序列化或在 foreachRDD 函数中创建它。