Ste*_*rgm 5 scala apache-spark kafka-producer-api
我想出了一个例外:
错误yarn.ApplicationMaster:用户类抛出异常:org.apache.spark.SparkException:任务不可序列化org.apache.spark.SparkException:任务不可序列化在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala: 304)在org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294)at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala :122)org.apache.spark.SparkContext.clean(SparkContext.scala:2032)在org.apache的org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:889).来自org.apache.spark.rdd.RDDOperationScope的org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:147)中的spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:888) $ .withScope(RDDOperationScope.scala:108)位于org.apache.spark.rdd.RDD.withScope(RDD.scala:306)org.apache.spark.rdd.RDD.foreach(RDD.scala:888)at com .boot $ .test(Boot.scala:60)at com.Boot $ .main(Boot.scala:36)at com.Boot.main(Boot.scala)在sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method .invoke(Method.java:606)at org.apache.spark.deploy.yarn.ApplicationMaster $$ anon $ 2.run(ApplicationMaster.scala:525)引起:java.io.NotSerializableException:org.apache.kafka.clients .producer.KafkaProducer序列化堆栈: - 对象不可序列化(类:org.apache.kafka.clients.producer.KafkaProducer,值:org.apache.kafka.clients.producer.KafkaProducer@77624599) - 字段(类:com.Boot) $$ anonfun $ test $ 1,name:producer $ 1,type:class org.apache.kafka.clients.producer.KafkaProducer) - object(class com.Boot $$ anonfun $ test $ 1,)org.apache.spark.serializer .serializationDebugger $ .improveException(SerializationDebugger.scala:40)at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)a t org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:301)
// @transient
val sparkConf = new SparkConf()
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// @transient
val sc = new SparkContext(sparkConf)
val requestSet: RDD[String] = sc.textFile(s"hdfs:/user/bigdata/ADVERTISE-IMPRESSION-STAT*/*")
// @transient
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, NearLineConfig.kafka_brokers)
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "49152")
// @transient
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
requestSet.foreachPartition((partisions: Iterator[String]) => {
partisions.foreach((line: String) => {
try {
producer.send(new ProducerRecord[String, String]("testtopic", line))
} catch {
case ex: Exception => {
log.warn(ex.getMessage, ex)
}
}
})
})
producer.close()
Run Code Online (Sandbox Code Playgroud)
在这个程序中,我尝试从hdfs路径读取记录并将它们保存到kafka中.问题是,当我删除有关向kafka发送记录的代码时,它运行良好.我错过了什么?
Yuv*_*kov 10
KafkaProducer不可序列化.您需要将实例的创建移动到内部foreachPartition:
requestSet.foreachPartition((partitions: Iterator[String]) => {
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
partitions.foreach((line: String) => {
try {
producer.send(new ProducerRecord[String, String]("testtopic", line))
} catch {
case ex: Exception => {
log.warn(ex.getMessage, ex)
}
}
})
})
Run Code Online (Sandbox Code Playgroud)
请注意,KafkaProducer.send返回a Future[RecordMetadata],唯一可以传播的异常是SerializationException无法序列化键或值.
| 归档时间: |
|
| 查看次数: |
3414 次 |
| 最近记录: |