Spark 2.x + Tika: java.lang.NoSuchMethodError: org.apache.commons.compress.archivers.ArchiveStreamFactory.detect

Wou*_*ing 5 apache-tika apache-spark cloudera-cdh

我正在尝试解决 Apache Tika (>v 1.14) 解析作业的 spark-submit 类路径运行时问题。问题似乎涉及 spark-submit 类路径与我的 uber-jar。

平台:CDH 5.15(Spark 2.3 通过 CDH 文档添加)和 CDH 6(Spark 2.2 捆绑在 CDH 6 中)

我试过/评论:

(Cloudera) spark-submit 在哪里查找 Jar 文件?

(stackoverflow) 解决依赖关系问题在 apache-spark

(stackoverflow) Apache Tika ArchiveStreamFactory.detect 错误

强调:

  • Java 8 / Scala 2.11
  • 我正在构建一个 uber-jar 并通过 spark-submit 调用该 uber-jar
  • 我已经尝试将 --jars 选项添加到 spark-submit 调用(请参阅本文的进一步内容)
  • 我尝试将 --conf spark.driver.userClassPathFirst=true && --conf spark.executor.userClassPathFirst=true 添加到 spark-submit 调用(请参阅本文的进一步内容):

如果我将 --conf 标志包含在 spark-submit 中,结果:

$ spark-submit --master local[*] --class com.example.App --conf spark.executor.userClassPathFirst=true ./target/uber-tikaTest-1.19.jar

18/09/25 13:35:55 ERROR util.Utils: Exception encountered
java.lang.NullPointerException
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:72)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1307)
    at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:312)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/09/25 13:35:55 ERROR util.Utils: Exception encountered
java.lang.NullPointerException
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:72)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1307)
    at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:312)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

以下错误消息下方是以下文件:

  • build-and-run.sh 脚本(调用 spark-submit -- 包含选项的注释)
  • 示例应用程序
  • pom.xml
  • mvn 依赖树输出(显示“缺少”的 commons-compress 库包含在 uber-jar 中)

运行时的错误:

18/09/25 11:47:39 ERROR executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NoSuchMethodError: org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;
    at org.apache.tika.parser.pkg.ZipContainerDetector.detectArchiveFormat(ZipContainerDetector.java:160)
    at org.apache.tika.parser.pkg.ZipContainerDetector.detect(ZipContainerDetector.java:104)
    at org.apache.tika.detect.CompositeDetector.detect(CompositeDetector.java:84)
    at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:116)
    at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:159)
    at com.example.App$.tikaAutoDetectParser(App.scala:55)
    at com.example.App$$anonfun$1.apply(App.scala:69)
    at com.example.App$$anonfun$1.apply(App.scala:69)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1799)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/09/25 11:47:39 ERROR executor.Executor: Exception in task 5.0 in stage 0.0 (TID 5)
java.lang.NoSuchMethodError: org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;
    at org.apache.tika.parser.pkg.ZipContainerDetector.detectArchiveFormat(ZipContainerDetector.java:160)
    at org.apache.tika.parser.pkg.ZipContainerDetector.detect(ZipContainerDetector.java:104)
    at org.apache.tika.detect.CompositeDetector.detect(CompositeDetector.java:84)
    at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:116)
    at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:159)
    at com.example.App$.tikaAutoDetectParser(App.scala:55)
    at com.example.App$$anonfun$1.apply(App.scala:69)
    at com.example.App$$anonfun$1.apply(App.scala:69)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1799)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

构建和运行.sh:

笔记:

  • 我已经尝试在下面的主配置和纱线配置中为 userClassPathFirst 添加 --conf 标志,
  • 使用 --jar 标志指定从 mvn compile 生成的 uber-jar 与 pom.xml (在帖子中进一步提供)

构建和运行.sh

mvn compile

if true
then
spark-submit --master local[*] --class com.example.App ./target/uber-tikaTest-1.19.jar
fi

# tried the using the userClass flags for driver and executor for above and below calls to spark-submit
# --conf spark.driver.userClassPathFirst=true \
# --conf spark.executor.userClassPathFirst=true \

if false 
then
spark-submit --class com.example.App \
 --master yarn \
 --packages org.apache.commons:commons-compress:1.18 \
 --jars ./target/uber-tikaTest-1.19.jar \
 --num-executors 2 \
 --executor-memory 1024m \
 --executor-cores 2 \
 --driver-memory 2048m \
 --driver-cores 1 \
 ./target/uber-tikaTest-1.19.jar
fi
Run Code Online (Sandbox Code Playgroud)

示例应用程序:

package com.example
////////// Tika Imports
import org.apache.tika.metadata.Metadata
import org.apache.tika.parser.AutoDetectParser
import org.apache.tika.sax.BodyContentHandler
////////// Java HTTP Imports 
import java.net.URL;
import java.net.HttpURLConnection
import scala.collection.JavaConverters._
import scala.collection.mutable._
////////// Spark Imports 
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.{Row,SparkSession}


object App {
  case class InputStreamData(sourceURL: String, headerFields: Map[String,List[String]], inputStream: java.io.InputStream)

  def openUrlStream(sourceURL:String,apiKey:String):(InputStreamData) = {
    try {
     val url = new URL(sourceURL)
         val urlConnection = url.openConnection().asInstanceOf[HttpURLConnection] 
     urlConnection.setInstanceFollowRedirects(true)
         val headerFields = urlConnection.getHeaderFields()
         val input = urlConnection.getInputStream()
     InputStreamData(sourceURL, headerFields.asScala.map(x => (x._1,x._2.asScala.toList)), input)
    }
      catch {
      case e: Exception => {
        println("**********************************************************************************************")
        println("PARSEURL: INVALID URL: " + sourceURL)
        println(e.toString())
        println("**********************************************************************************************")

        InputStreamData(sourceURL, Map("ERROR" -> List("ERROR")), null)
      }
    }
  }

  def tikaAutoDetectParser(inputStream:java.io.InputStream):String = {
    var parser = new AutoDetectParser();
    var handler = new BodyContentHandler(-1);
    var metadata = new Metadata();
    parser.parse(inputStream, handler, metadata);
    return handler.toString()
  }

  def main(args : Array[String]) {
    var sparkConf = new SparkConf().setAppName("tika-1.19-test")
    val sc = new SparkContext(sparkConf) 
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    println("HELLO!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    var urls = List("http://www.pdf995.com/samples/pdf.pdf", "https://www.amd.com/en", "http://jeroen.github.io/images/testocr.png")

    var rdd = sc.parallelize(urls)
    var parsed = rdd.map(x => tikaAutoDetectParser(openUrlStream(x,"").inputStream))
    println(parsed.count)
  }
}
Run Code Online (Sandbox Code Playgroud)

pom.xml(构建 uber-jar):

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>tikaTest</artifactId>
  <version>1.19</version>
  <name>${project.artifactId}</name>
  <description>Testing tika 1.19 with CDH 6 and 5.x, Spark 2.x, Scala 2.11.x</description>
  <inceptionYear>2018</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>


 <repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>

<profiles>
    <profile>
        <id>scala-2.11.12</id>
        <activation>
            <activeByDefault>true</activeByDefault>
        </activation>
        <properties>
            <scalaVersion>2.11.12</scalaVersion>
            <scalaBinaryVersion>2.11.12</scalaBinaryVersion>
        </properties>
        <dependencies>
            <!-- ************************************************************************** -->
            <!-- GOOD DEPENDENCIES +++++++++++++++++++++++++++++++++++++ -->
            <!-- ************************************************************************** -->

            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-compress</artifactId>
                <version>1.18</version>
            </dependency>

            <!-- *************** CDH flavored dependencies ***********************************************-->
            <!-- https://www.cloudera.com/documentation/spark2/latest/topics/spark2_packaging.html#versions -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.0.cloudera3</version>
                <!-- have tried scope provided / compile -->
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.11</artifactId>
                    <version>2.2.0.cloudera3</version>
                    <!-- have tried scope provided / compile -->
                    <!--<scope>provided</scope>-->
                </dependency>

                <!-- https://mvnrepository.com/artifact/org.apache.tika/tika-core -->
                <dependency>
                    <groupId>org.apache.tika</groupId>
                    <artifactId>tika-core</artifactId>
                    <version>1.19</version>
                </dependency>

                <!-- https://mvnrepository.com/artifact/org.apache.tika/tika-parsers -->
                <dependency>
                    <groupId>org.apache.tika</groupId>
                    <artifactId>tika-parsers</artifactId>
                    <version>1.19</version>
                </dependency>

                <!-- https://mvnrepository.com/artifact/javax.ws.rs/javax.ws.rs-api -->
                <dependency>
                    <groupId>javax.ws.rs</groupId>
                    <artifactId>javax.ws.rs-api</artifactId>
                    <version>2.1.1</version>
                </dependency>

            <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
                <dependency>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                    <version>2.11.12</version>
                </dependency>


            <!-- **************************************************************************************************************************
            **************************** alternative dependencies that have been tried and yield same Tika error***************************
            *******************************************************************************************************************************-->
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
                <!--
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-core_2.11</artifactId>
                    <version>2.2.0</version>
                </dependency>
                -->

            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
                <!--
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.11</artifactId>
                    <version>2.2.0</version>
                </dependency>
                -->

            </dependencies>
        </profile>
    </profiles>


    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <args>
                        <!-- work-around for https://issues.scala-lang.org/browse/SI-8358 -->
                        <arg>-nobootcp</arg>
                    </args>
                </configura

Ami*_*rHd 1

异常的根源是什么?

这是依赖冲突的结果。

为什么我不能通过我的 POM.XML 解决它?

因为这不是您的 jar 文件内的内部冲突。这是与 Apache Spark 的冲突。

究竟出了什么问题?

Spark 2.x发行版包含旧版本的 commons-compress,而Tika库依赖于 1.18 版本的commons-compress库。

解决方案

使用or--driver-class-path中的参数指向正确版本的库。spark-shellspark-submitcommons-compress

spark-submit 
    --driver-class-path ~/.m2/repository/org/apache/commons/commons-compress/1.18/commons-compress-1.18.jar
    --class {you.main.class}
....
Run Code Online (Sandbox Code Playgroud)

我面临着同样的问题。在这篇文章中找到答案,在此处输入链接描述