使用Spark Streaming读取Kafka记录时不可序列化的异常

Ser*_*eyB 5 apache-kafka apache-spark spark-streaming

使用Spark 2.0从Kafka流式传输时,我收到以下错误:

org.apache.spark.SparkException: 
Job aborted due to stage failure: 
Task 0.0 in stage 1.0 (TID 1) had a not serializable result: 
org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: 
org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(
topic = mytopic, partition = 0, offset = 422337, 
CreateTime = 1472871209063, checksum = 2826679694, 
serialized key size = -1, serialized value size = 95874, 
key = null, value = <JSON GOES HERE...>
Run Code Online (Sandbox Code Playgroud)

以下是代码的相关部分:

val ssc = new StreamingContext(sc, Seconds(2))

val topics = Array("ecfs")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream
  .map(_.value())
  .flatMap(message =>  {
    // parsing here...
  })
  .foreachRDD(rdd => {
    // processing here...
  })

ssc.start()
Run Code Online (Sandbox Code Playgroud)

据我所知,正是这条线引起了问题.map(_.value()),如何解决?

Vij*_*hna 1

您不能像在那里使用的那样在 Dstream:[String,String] 上使用 .map 。我认为你可以使用变换,然后应用地图,如下所示

val streamed_rdd_final = streamed_rdd.transform{ rdd => rdd.map(x => x.split("\t")).map(x=>Array(check_time_to_send.toString,check_time_to_send_utc.toString,x(1),x(2),x(3),x(4),x(5))).map(x => x(1)+"\t"+x(2)+"\t"+x(3)+"\t"+x(4)+"\t"+x(5)+"\t"+x(6)+"\t"+x(7)+"\t")}

或者您可以像您使用的那样使用 .map ,而不是执行 _.value() 您应该尝试将函数发送到地图中,就像我在下面所做的那样

stream.map{case (x, y) => (y.toString)}
Run Code Online (Sandbox Code Playgroud)