Spark Streaming 检查点引发 Not Serializable 异常

Sha*_*kar 1 scala spark-streaming

我们正在使用基于 Spark Streaming Receiver 的方法,并且我们刚刚启用了检查指向来摆脱数据丢失问题。

Spark 版本是1.6.1,我们正在接收来自 Kafka 主题的消息。

我在ssc里面使用,foreachRDD方法 of DStream,所以它抛出 Not Serializable 异常。

我尝试扩展 Serializable 类,但仍然出现相同的错误。它仅在我们启用检查点时发生。

代码是:

def main(args: Array[String]): Unit = {

    val checkPointLocation = "/path/to/wal"
    val ssc = StreamingContext.getOrCreate(checkPointLocation, () => createContext(checkPointLocation))
    ssc.start()
    ssc.awaitTermination()
  }

    def createContext (checkPointLocation: String): StreamingContext ={

        val sparkConf = new SparkConf().setAppName("Test")
        sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
        val ssc = new StreamingContext(sparkConf, Seconds(40))
        ssc.checkpoint(checkPointLocation)
        val sc = ssc.sparkContext
        val sqlContext: SQLContext = new HiveContext(sc)
        val kafkaParams = Map("group.id" -> groupId,
        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> sasl,
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
        "metadata.broker.list" -> brokerList,
        "zookeeper.connect" -> zookeeperURL)
      val dStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
      dStream.foreachRDD(rdd =>
        {
           // using sparkContext / sqlContext to do any operation throws error.
           // convert RDD[String] to RDD[Row]
           //Create Schema for the RDD.
           sqlContext.createDataFrame(rdd, schema)
        })
        ssc
    }
Run Code Online (Sandbox Code Playgroud)

错误日志:

2017-02-08 22:53:53,250 错误 [Driver] streaming.StreamingContext:启动上下文时出错,将其标记为已停止 java.io.NotSerializableException:DStream 检查点已启用,但 DStream 及其功能不可序列化 org.apache .spark.SparkContext 序列化堆栈:- 对象不可序列化(类:org.apache.spark.SparkContext,值:org.apache.spark.SparkContext@1c5e3677)- 字段(类:com.x.payments.RemedyDriver$$anonfun$ main$1,名称:sc$1,类型:类 org.apache.spark.SparkContext)-对象(类 com.x.payments.RemedyDriver$$anonfun$main$1,)-字段(类:org.apache.spark.streaming .dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,名称:cleanedF$1,类型:接口scala.Function1)-对象(类org.apache.spark.streaming。dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, ) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStream) - 对象(类 org.apache.spark.streaming .dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@68866c5) - 数组元素(索引:0) - 数组(类 [Ljava.lang.Object;,大小 16) - 字段(类:scala。 collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStreamCheckpointData) - 对象(类 org.apache.spark.streaming.dstream.DStreamCheckpointData,[ 0 检查点文件DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, ) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStream) - 对象(类 org.apache.spark.streaming.dstream .ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@68866c5) - 数组元素(索引:0) - 数组(类 [Ljava.lang.Object;,大小 16) - 字段(类:scala.collection。 mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject数据(类:org.apache.spark.streaming.dstream.DStreamCheckpointData)-对象(类org.apache.spark.streaming.dstream.DStreamCheckpointData,[0个检查点文件DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, ) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStream) - 对象(类 org.apache.spark.streaming.dstream .ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@68866c5) - 数组元素(索引:0) - 数组(类 [Ljava.lang.Object;,大小 16) - 字段(类:scala.collection。 mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject数据(类:org.apache.spark.streaming.dstream.DStreamCheckpointData)-对象(类org.apache.spark.streaming.dstream.DStreamCheckpointData,[0个检查点文件) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStream) - 对象(类 org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@68866c5) -数组元素(索引:0) - 数组(类 [Ljava.lang.Object;,大小 16) - 字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类 [Ljava.lang.Object; ) - 对象 (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject 数据 (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - 对象(类 org.apache.spark.streaming.dstream.DStreamCheckpointData,[0 个检查点文件) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStream) - 对象(类 org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@68866c5) -数组元素(索引:0) - 数组(类 [Ljava.lang.Object;,大小 16) - 字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类 [Ljava.lang.Object; ) - 对象 (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject 数据 (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - 对象(类 org.apache.spark.streaming.dstream.DStreamCheckpointData,[0 个检查点文件DStream) - 对象 (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@68866c5) - 数组元素 (index: 0) - 数组 (class [Ljava.lang.对象;,大小 16) - 字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类 [Ljava.lang.Object;) - 对象(类 scala.collection.mutable.ArrayBuffer, ArrayBuffer(org. apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStreamCheckpointData) - 对象(类 org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint档案DStream) - 对象 (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@68866c5) - 数组元素 (index: 0) - 数组 (class [Ljava.lang.对象;,大小 16) - 字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类 [Ljava.lang.Object;) - 对象(类 scala.collection.mutable.ArrayBuffer, ArrayBuffer(org. apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStreamCheckpointData) - 对象(类 org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint档案0)-数组(类[Ljava.lang.Object;,大小16)-字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类[Ljava.lang.Object;)-对象(类scala .collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStreamCheckpointData) - 对象(类 org.apache. spark.streaming.dstream.DStreamCheckpointData, [ 0 检查点文件0)-数组(类[Ljava.lang.Object;,大小16)-字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类[Ljava.lang.Object;)-对象(类scala .collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStreamCheckpointData) - 对象(类 org.apache. spark.streaming.dstream.DStreamCheckpointData, [ 0 检查点文件dstream.DStreamCheckpointData) - 对象(类 org.apache.spark.streaming.dstream.DStreamCheckpointData,[0 个检查点文件dstream.DStreamCheckpointData) - 对象(类 org.apache.spark.streaming.dstream.DStreamCheckpointData,[0 个检查点文件

]) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStream) - 对象(类 org.apache.spark.streaming.kafka.KafkaInputDStream,org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32) - 数组元素(索引:0) - 数组(类 [Ljava.lang.Object;,大小为 16) - 字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类 [Ljava.lang.Object ;) - 对象(类 scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32)) - writeObject 数据(类:org.apache.spark.streaming.DStreamGraph) - 对象( class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@6935641e) - 字段(类:org.apache.spark.streaming.Checkpoint,名称:graph,类型:class org.apache.spark.streaming.DStreamGraph) - 在 org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala) 的对象(类 org.apache.spark.streaming.Checkpoint,org.apache.spark.streaming.Checkpoint@484bf033) :557) 在 org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) 在 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) 在 com.x.payments.RemedyDriver $.main(RemedyDriver.scala:104) at com.x.payments.RemedyDriver.main(RemedyDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang。org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:559) 2017-02-08 22:53:53,250 错误 [驱动程序] Payments.RemedyDriver$:DStream 检查点已启用,但 DStreams 及其功能不可序列化 org.apache.spark.SparkContext 序列化堆栈:- 对象不可序列化(类:org.apache.spark.SparkContext,值:org.apache .spark.SparkContext@1c5e3677) - 字段(类:com.x.payments.RemedyDriver$$anonfun$main$1,名称:sc$1,类型:类 org.apache.spark.SparkContext)-对象(类 com.x. Payments.RemedyDriver$$anonfun$main$1, ) - 字段(类:org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,名称:cleanedF$1,类型: 接口scala。Function1) - 对象(类 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, ) - writeObject 数据(类:org.apache.spark.streaming.dstream .DStream) - 对象(类 org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@68866c5) - 数组元素(索引:0) - 数组(类 [Ljava.lang .Object;, size 16) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org) .apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStreamCheckpointData) - 对象(类 org.apache.spark.streaming.dstream。DStreamCheckpointData, [ 0 检查点文件

]) - writeObject 数据(类:org.apache.spark.streaming.dstream.DStream) - 对象(类 org.apache.spark.streaming.kafka.KafkaInputDStream,org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32) - 数组元素(索引:0) - 数组(类 [Ljava.lang.Object;,大小为 16) - 字段(类:scala.collection.mutable.ArrayBuffer,名称:数组,类型:类 [Ljava.lang.Object ;) - 对象(类 scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32)) - writeObject 数据(类:org.apache.spark.streaming.DStreamGraph) - 对象( class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@6935641e) - 字段(类:org.apache.spark.streaming.Checkpoint,名称:graph,类型:class org.apache.spark.streaming.DStreamGraph) - 对象(类 org.apache.spark.streaming.Checkpoint,org.apache.spark.streaming.Checkpoint@484bf033)2017-02-08 22:53:53,255 INFO [Driver] yarn。 ApplicationMaster:最终应用状态:成功,退出代码:0

更新:

基本上我们要做的是,将 rdd 转换为 DF [在 DStream 的 foreachRDD 方法中],然后在其上应用 DF API,最后将数据存储在 Cassandra 中。所以我们使用 sqlContext 将 rdd 转换为 DF,那个时候它会抛出错误。

Yuv*_*kov 5

如果要访问SparkContext,请通过以下rdd值进行访问:

dStream.foreachRDD(rdd => {
  val sqlContext = new HiveContext(rdd.context)
  val dataFrameSchema = sqlContext.createDataFrame(rdd, schema)
}
Run Code Online (Sandbox Code Playgroud)

这个:

dStream.foreachRDD(rdd => {
  // using sparkContext / sqlContext to do any operation throws error.
   val numRDD = sc.parallelize(1 to 10, 2)
   log.info("NUM RDD COUNT:"+numRDD.count())
}
Run Code Online (Sandbox Code Playgroud)

导致SparkContext在闭包中序列化,它不能序列化,因为它不可序列化。