cov*_*efe 13 scala apache-kafka apache-spark spark-streaming
我正在尝试使用createDirectStream方法打开Kafka(尝试版本0.11.0.2和1.0.1)流并获取此AbstractMethodError错误:
Exception in thread "main" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
Run Code Online (Sandbox Code Playgroud)
这就是我所说的:
val preferredHosts = LocationStrategies.PreferConsistent
val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[IntegerDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest"
)
val aCreatedStream = createDirectStream[String, String](ssc, preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
Run Code Online (Sandbox Code Playgroud)
我有Kafka在9092运行,我能够创建生产者和消费者,并在他们之间传递消息,所以不知道为什么它不能使用Scala代码.任何想法都赞赏.
cov*_*efe 19
事实证明我使用的是Spark 2.3,我应该使用Spark 2.2.显然这个方法在后来的版本中是抽象的,所以我得到了那个错误.
我有同样的例外,在我的情况下,我创建了依赖于spark-streaming-kafka-0-10_2.11版本的应用程序jar 2.1.0,同时尝试部署到Spark 2.3.0集群.
| 归档时间: |
|
| 查看次数: |
7800 次 |
| 最近记录: |