为什么Spark应用程序失败并出现"ClassNotFoundException:找不到数据源:kafka"作为带有sbt程序集的uber-jar?

ben*_*uin 21 scala sbt sbt-assembly apache-spark spark-structured-streaming

我正在尝试运行像https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala这样的示例.我从http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html上的Spark Structured Streaming Programming指南开始.

我的代码是

package io.boontadata.spark.job1

import org.apache.spark.sql.SparkSession

object DirectKafkaAggregateEvents {
  val FIELD_MESSAGE_ID = 0
  val FIELD_DEVICE_ID = 1
  val FIELD_TIMESTAMP = 2
  val FIELD_CATEGORY = 3
  val FIELD_MEASURE1 = 4
  val FIELD_MEASURE2 = 5

  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(s"""
        |Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <subscribeType> sample value: subscribe
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    val Array(bootstrapServers, subscribeType, topics) = args

    val spark = SparkSession
      .builder
      .appName("boontadata-spark-job1")
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Generate running word count
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }

}
Run Code Online (Sandbox Code Playgroud)

我添加了以下sbt文件:

build.sbt:

name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"

// META-INF discarding
assemblyMergeStrategy in assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}
Run Code Online (Sandbox Code Playgroud)

我还添加了project/assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Run Code Online (Sandbox Code Playgroud)

这创造了一个带有非provided罐子的Uber 罐子.

我提交以下行:

spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic
Run Code Online (Sandbox Code Playgroud)

但我得到这个运行时错误:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
        ... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook
Run Code Online (Sandbox Code Playgroud)

有没有办法知道找不到哪个类,以便我可以搜索该类的maven.org repo.

lookupDataSource源代码似乎是在在线543 https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource. scala但我找不到与Kafka数据源的直接链接...

完整的源代码在这里:https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f

Jac*_*ski 20

问题是以下部分build.sbt:

// META-INF discarding
assemblyMergeStrategy in assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}
Run Code Online (Sandbox Code Playgroud)

它表示META-INF应该丢弃所有的entires,包括使数据源别名(例如kafka)工作的"代码" .

但这些META-INF文件对于kafka(和其他流数据源别名)的工作非常重要.

对于kafka别名工作,Spark SQL使用带有以下条目的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:

org.apache.spark.sql.kafka010.KafkaSourceProvider
Run Code Online (Sandbox Code Playgroud)

KafkaSourceProvider 负责 kafka使用正确的流数据源(即KafkaSource)注册别名.

只是为了检查实际代码是否确实可用,但是注册了别名的"代码"不是,您可以使用kafka完全限定名称(而不是别名)的数据源,如下所示:

spark.readStream.
  format("org.apache.spark.sql.kafka010.KafkaSourceProvider").
  load
Run Code Online (Sandbox Code Playgroud)

由于缺少选项kafka.bootstrap.servers,你会看到其他问题,但是...... 我们正在离题.

一个解决方案是MergeStrategy.concat所有META-INF/services/org.apache.spark.sql.sources.DataSourceRegister(这将创建一个包含所有数据源的超级jar,包括kafka数据源).

case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
Run Code Online (Sandbox Code Playgroud)

  • 实际上听起来我设法让它按照你的建议工作。只需要在合并策略中订购“案例”并将其放在顶部。 (3认同)

小智 19

我试过这样对我有用.提交后,如果您有任何问题,请告诉我

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar
Run Code Online (Sandbox Code Playgroud)

  • 我将其称为解决方法,并且没有解释为什么uber-jar无法正常工作。根本原因是build.sbt中的assemblyMergeStrategy,它丢弃所有包含META-INF文件的文件。注册。请参阅下面的答案。 (2认同)