为什么 Spark 应用程序会失败,并显示“ClassNotFoundException:无法找到数据源:jdbc”作为带有 sbt 程序集的 uber-jar?

kor*_*e82 5 scala sbt sbt-assembly apache-spark apache-spark-sql

我正在尝试使用 sbt 1.0.4 和 sbt- assembly 0.14.6 来组装 Spark 应用程序。

Spark 应用程序在 IntelliJ IDEA 或 中启动时工作正常spark-submit,但如果我使用命令行(Windows 10 中的 cmd)运行组装的 uber-jar:

java -Xmx1024m -jar my-app.jar
Run Code Online (Sandbox Code Playgroud)

我得到以下异常:

线程“main”中出现异常 java.lang.ClassNotFoundException:找不到数据源:jdbc。请在http://spark.apache.org/third-party-projects.html找到软件包

Spark 应用程序如下所示。

package spark.main

import java.util.Properties    
import org.apache.spark.sql.SparkSession

object Main {

    def main(args: Array[String]) {
        val connectionProperties = new Properties()
        connectionProperties.put("user","postgres")
        connectionProperties.put("password","postgres")
        connectionProperties.put("driver", "org.postgresql.Driver")

        val testTable = "test_tbl"

        val spark = SparkSession.builder()
            .appName("Postgres Test")
            .master("local[*]")
            .config("spark.hadoop.fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
            .config("spark.sql.warehouse.dir", System.getProperty("java.io.tmpdir") + "swd")
            .getOrCreate()

        val dfPg = spark.sqlContext.read.
            jdbc("jdbc:postgresql://localhost/testdb",testTable,connectionProperties)

        dfPg.show()
    }
}
Run Code Online (Sandbox Code Playgroud)

以下是build.sbt

name := "apache-spark-scala"

version := "0.1-SNAPSHOT"

scalaVersion := "2.11.8"

mainClass in Compile := Some("spark.main.Main")

libraryDependencies ++= {
    val sparkVer = "2.1.1"
    val postgreVer = "42.0.0"
    val cassandraConVer = "2.0.2"
    val configVer = "1.3.1"
    val logbackVer = "1.7.25"
    val loggingVer = "3.7.2"
    val commonsCodecVer = "1.10"
    Seq(
        "org.apache.spark" %% "spark-sql" % sparkVer,
        "org.apache.spark" %% "spark-core" % sparkVer,
        "com.datastax.spark" %% "spark-cassandra-connector" % cassandraConVer,
        "org.postgresql" % "postgresql" % postgreVer,
        "com.typesafe" % "config" % configVer,
        "commons-codec" % "commons-codec" % commonsCodecVer,
        "com.typesafe.scala-logging" %% "scala-logging" % loggingVer,
        "org.slf4j" % "slf4j-api" % logbackVer
    )
}

dependencyOverrides ++= Seq(
    "io.netty" % "netty-all" % "4.0.42.Final",
    "commons-net" % "commons-net" % "2.2",
    "com.google.guava" % "guava" % "14.0.1"
)

assemblyMergeStrategy in assembly := {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
}
Run Code Online (Sandbox Code Playgroud)

有谁知道,为什么?

[更新]

从官方 GitHub 存储库获取的配置成功了:

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) =>
    xs map {_.toLowerCase} match {
      case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) =>
        MergeStrategy.discard
      case ps @ (x :: xs) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa") =>
          MergeStrategy.discard
      case "services" :: _ =>  MergeStrategy.filterDistinctLines
      case _ => MergeStrategy.first
    }
    case _ => MergeStrategy.first
}
Run Code Online (Sandbox Code Playgroud)

Jac*_*ski 4

问题几乎是为什么 format("kafka") 失败并显示“无法找到数据源:kafka”。与超级罐子?与其他OP使用Apache Maven创建uber-jar的区别是,这里是关于sbt的(准确地说是sbt-assemble插件的配置)。


数据源的短名称(又名别名jdbc),例如或,仅当相应的寄存器 akafka时才可用。META-INF/services/org.apache.spark.sql.sources.DataSourceRegisterDataSourceRegister

要使jdbc别名正常工作,Spark SQL 使用META-INF/services/org.apache.spark.sql.sources.DataSourceRegister以及以下条目(还有其他条目):

org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
Run Code Online (Sandbox Code Playgroud)

这就是将jdbc别名与数据源联系起来的原因。

并且您已通过以下命令将其从 uber-jar 中排除assemblyMergeStrategy

assemblyMergeStrategy in assembly := {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
}
Run Code Online (Sandbox Code Playgroud)

请注意case PathList("META-INF", xs @ _*)您只需MergeStrategy.discard。这就是根本原因。

只是为了检查“基础设施”是否可用并且您可以jdbc通过其完全限定名称(而不是别名)使用数据源,请尝试以下操作:

spark.read.
  format("org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider").
  load("jdbc:postgresql://localhost/testdb")
Run Code Online (Sandbox Code Playgroud)

由于缺少诸如 之类的选项,您会看到其他问题url,但是...我们离题了

解决方案是针对MergeStrategy.concat所有人的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister(这将创建一个包含所有数据源的 uber-jar,包括jdbc数据源)。

case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
Run Code Online (Sandbox Code Playgroud)