java.util.ConcurrentModificationException:KafkaConsumer 对于多线程访问不安全

lu_*_*rra 2 multithreading scala apache-kafka apache-spark spark-streaming

我有一个Scala Spark Streaming应用程序,它从 3 个不同的Kafka producers.

Spark 流应用程序位于主机 的机器上0.0.0.179,Kafka 服务器位于主机 的机器上0.0.0.178,它们Kafka producers位于机器 , 0.0.0.180,0.0.0.1810.0.0.182

当我尝试运行Spark Streaming应用程序时出现以下错误

线程“main”org.apache.spark.SparkException 中出现异常:作业由于阶段失败而中止:阶段 19.0 中的任务 0 失败 1 次,最近一次失败:阶段 19.0 中丢失任务 0.0(TID 19,本地主机):java.util .ConcurrentModificationException:KafkaConsumer 对于 org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1625) 的多线程访问不安全,在 org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer. java:1198)在org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)在org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)在org.apache .spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) 在 org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) 在 scala.collection.Iterator$$ anon$11.next(Iterator.scala:409) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13 $$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1204) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions. scala:1203) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) 在 org.apache.spark.util.Utils$ .tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211) 在 org.apache.spark.rdd.PairRDDFunctions$$ anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 在 org.apache.spark.scheduler.Task.run(Task .scala:85) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:27​​4) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent。 ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:748)

现在我阅读了数千篇不同的帖子,但似乎没有人能够找到这个问题的解决方案。

我该如何在我的申请中处理这个问题?我是否需要修改Kakfa上的一些参数(目前参数num.partition设置为1)?

以下是我的应用程序的代码:

// Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").set("spark.streaming.concurrentJobs", "3").setMaster("local[4]")
val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(3))

case class Thema(name: String, metadata: String)
case class Tempo(unit: String, count: Int, metadata: String)
case class Spatio(unit: String, metadata: String)
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
case class Location(latitude: Double, longitude: Double, name: String)

case class Datas1(location : Location, timestamp : String, windspeed : Double, direction: String, strenght : String)
case class Sensors1(sensor_name: String, start_date: String, end_date: String, data1: Datas1, stt: Stt)    


val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "0.0.0.178:9092",
    "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "group.id" -> "test_luca",
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics1 = Array("topics1")

  val s1 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(record => {
    implicit val formats = DefaultFormats
    parse(record.value).extract[Sensors1]
  } 
  )      
  s1.print()
  s1.saveAsTextFiles("results/", "")
ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

谢谢

Yuv*_*kov 5

你的问题在这里:

s1.print()
s1.saveAsTextFiles("results/", "")
Run Code Online (Sandbox Code Playgroud)

由于 Spark 创建了一个流图,并且您在此处定义了两个流:

Read from Kafka -> Print to console
Read from Kafka -> Save to text file
Run Code Online (Sandbox Code Playgroud)

Spark 将尝试同时运行这两个图,因为它们彼此独立。由于 Kafka 使用缓存消费者方法,因此它实际上尝试对两个流执行使用相同的消费者。

DStream您可以做的是在运行两个查询之前缓存:

val dataFromKafka = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(/* stuff */)

val cachedStream = dataFromKafka.cache()
cachedStream.print()
cachedStream.saveAsTextFiles("results/", "")
Run Code Online (Sandbox Code Playgroud)