解决Apache Spark中的依赖性问题

use*_*271 30 java scala nosuchmethoderror classnotfoundexception apache-spark

构建和部署Spark应用程序时的常见问题是:

  • java.lang.ClassNotFoundException.
  • object x is not a member of package y 编译错误.
  • java.lang.NoSuchMethodError

如何解决这些问题?

Tza*_*har 28

Apache Spark的类路径是动态构建的(以适应每个应用程序的用户代码),这使得它易受此类问题的攻击.@ user7337271的答案是正确的,但还有一些问题,具体取决于你正在使用的集群管理器("master").

首先,Spark应用程序由这些组件组成(每个组件都是一个单独的JVM,因此可能在其类路径中包含不同的类):

  1. 驱动程序:这是您的应用程序创建SparkSession(或SparkContext)并连接到集群管理器以执行实际工作
  2. Cluster Manager:作为集群的"入口点",负责为每个应用程序分配执行程序.Spark支持几种不同的类型:独立,YARN和Mesos,我们将在下面描述.
  3. 执行者:这些是集群节点上的进程,执行实际工作(运行Spark 任务)

Apache Spark的集群模式概述在此图中描述了它们之间的关系:

群集模式概述

现在 - 哪些类应该驻留在每个组件中?

这可以通过下图解答:

课程安排概述

我们慢慢地解析一下:

  1. Spark Code是Spark的库.它们应该存在于所有三个组件中,因为它们包含让Spark执行它们之间通信的粘合剂.顺便说一句 - Spark作者做出了一个设计决定,要求在所有组件中包含所有组件的代码(例如,包括只应在驱动程序中的Executor中运行的代码)以简化这一过程 - 所以Spark的"胖罐"(版本高达1.6) )或"归档"(在2.0中,详细信息如下)包含所有组件的必要代码,并且应该在所有组件中都可用.

  2. 仅驱动程序代码这是用户代码,不包括应在Executors上使用的任何内容,即RDD/DataFrame/Dataset上的任何转换中未使用的代码.这不一定必须与分布式用户代码分开,但也可以.

  3. 分布式代码这是用驱动程序代码编译的用户代码,但也必须在执行程序上执行 - 实际转换使用的所有内容都必须包含在此jar中.

既然我们已经做到了这一点,我们如何让每个组件中的类正确加载,以及它们应遵循哪些规则?

  1. Spark代码:如前所述,您必须在所有组件中使用相同的ScalaSpark版本.

    1.1在独立模式下,应用程序(驱动程序)可以连接"预先存在的"Spark安装.这意味着所有驱动程序必须使用在主服务器和执行程序上运行的相同Spark版本.

    1.2在YARN/Mesos中,每个应用程序可以使用不同的Spark版本,但同一应用程序的所有组件必须使用相同的版本.这意味着如果您使用版本X来编译和打包驱动程序应用程序,则应在启动SparkSession时提供相同的版本(例如,使用YARN时通过spark.yarn.archivespark.yarn.jars参数).您提供的jar/archive应该包括所有Spark依赖项(包括传递依赖项),并且在应用程序启动时,集群管理器会将其提供给每个执行程序.

  2. 驱动程序代码:完全取决于 - 驱动程序代码可以作为一堆罐子或"胖罐"发货,只要它包含所有Spark依赖项+所有用户代码

  3. 分布式代码:除了存在于驱动程序之外,还必须将此代码发送给执行程序(同样,还要包含其所有传递依赖项).这是使用spark.jars参数完成的.

总而言之,这是建议和部署Spark应用程序的建议方法(在本例中 - 使用YARN):

  • 使用您的分布式代码创建一个库,将其打包为"常规"jar(带有描述其依赖项的.pom文件)和"胖jar"(包含所有传递依赖项).
  • 创建一个驱动程序应用程序,在分布式代码库和Apache Spark上使用编译依赖项(具有特定版本)
  • 将驱动程序应用程序打包到胖jar中以部署到驱动程序
  • 启动时,将正确版本的分布式代码作为spark.jars参数值SparkSession
  • 将包含lib/下载的Spark二进制文件夹下所有jar的存档文件(例如gzip)的位置作为值传递spark.yarn.archive


use*_*271 18

构建和部署Spark应用程序时,所有依赖项都需要兼容版本.

  • Scala版本.所有包都必须使用相同的主要(2.10,2.11,2.12)Scala版本.

    考虑以下(不正确)build.sbt:

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    
    Run Code Online (Sandbox Code Playgroud)

    我们使用spark-streamingScala 2.10,而剩余的包用于Scala 2.11.一个有效的文件可能是

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.11" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    
    Run Code Online (Sandbox Code Playgroud)

    但最好全局指定版本并使用%%:

    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.11.7"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" %% "spark-core" % "2.0.1",
       "org.apache.spark" %% "spark-streaming" % "2.0.1",
       "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1"
    )
    
    Run Code Online (Sandbox Code Playgroud)

    同样在Maven中:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
    
    Run Code Online (Sandbox Code Playgroud)
  • Spark版本所有软件包都必须使用相同的主要Spark版本(1.6,2.0,2.1,...).

    考虑以下(不正确的)build.sbt:

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "1.6.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    
    Run Code Online (Sandbox Code Playgroud)

    我们使用spark-core1.6而其余组件都在Spark 2.0中.一个有效的文件可能是

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    
    Run Code Online (Sandbox Code Playgroud)

    但最好使用变量:

    name := "Simple Project"
    
    version := "1.0"
    
    val sparkVersion = "2.0.1"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % sparkVersion,
       "org.apache.spark" % "spark-streaming_2.10" % sparkVersion,
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion
    )
    
    Run Code Online (Sandbox Code Playgroud)

    同样在Maven中:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
        <scala.version>2.11</scala.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
    
    Run Code Online (Sandbox Code Playgroud)
  • Spark依赖项中使用的Spark版本必须匹配Spark安装的Spark版本.例如,如果在群集上使用1.6.1,则必须使用1.6.1来构建jar.不一定接受次要版本不匹配.

  • 用于构建jar的Scala版本必须与用于构建部署Spark的Scala版本相匹配.默认情况下(可下载的二进制文件和默认构建):

    • Spark 1.x - > Scala 2.10
    • Spark 2.x - > Scala 2.11
  • 如果包含在胖罐中,则应该可以在工作节点上访问其他包.有很多选择,包括:

    • --jars参数spark-submit- 分发本地jar文件.
    • --packages参数spark-submit- 从Maven存储库获取依赖项.

    当群集节点提交你应该包括应用jar--jars.