在Spark流媒体中找不到KafkaUtils类

kah*_*hlo 10 sbt apache-kafka apache-spark

我刚开始使用Spark Streaming,我正在尝试构建一个计算来自Kafka流的单词的示例应用程序.虽然它可以编译sbt package,但是当我运行它时,我会得到它NoClassDefFoundError.这篇文章似乎有同样的问题,但解决方案是Maven,我无法用sbt重现它.

KafkaApp.scala:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaApp {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1))
    val kafkaParams = Map(
        "zookeeper.connect" -> "localhost:2181",
        "zookeeper.connection.timeout.ms" -> "10000",
        "group.id" -> "sparkGroup"
    )

    val topics = Map(
        "test" -> 1
    )

    // stream of (topic, ImpressionLog)
    val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK)
    println(s"Number of words: %{messages.count()}")
  }
}
Run Code Online (Sandbox Code Playgroud)

build.sbt:

name := "Simple Project"

version := "1.1"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.1.1",
    "org.apache.spark" %% "spark-streaming" % "1.1.1",
    "org.apache.spark" %% "spark-streaming-kafka" % "1.1.1"
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
Run Code Online (Sandbox Code Playgroud)

我提交的是:

bin/spark-submit \
  --class "KafkaApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.1.jar
Run Code Online (Sandbox Code Playgroud)

错误:

14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@192.168.5.252:65077/user/HeartbeatReceiver
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
    at KafkaApp$.main(KafkaApp.scala:28)
    at KafkaApp.main(KafkaApp.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
Run Code Online (Sandbox Code Playgroud)

Tat*_*Das 17

spark-submit不会自动放入包含KafkaUtils的包.您需要在项目JAR中拥有.为此,您需要使用sbt程序集创建一个包含所有内容的超级jar .这是一个build.sbt示例.

https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt

您显然还需要将程序集插件添加到SBT.

https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project


小智 7

请在提交申请时尝试包含所有依赖项:

./spark-submit --name"SampleApp"--deploy-mode client - master spark:// host:7077 --class com.stackexchange.SampleApp --jars $ SPARK_INSTALL_DIR/spark-streaming-kafka_2.10-1.3 .0.jar,$ KAFKA_INSTALL_DIR/libs/kafka_2.10-0.8.2.0.jar,$ KAFKA_INSTALL_DIR/libs/metrics-core-2.2.0.jar,$ KAFKA_INSTALL_DIR/libs/zkclient-0.3.jar spark-example-1.0 -SNAPSHOT.jar