为什么除非使用--packages,否则spark-submit找不到kafka数据源?

Use*_*er3 4 maven apache-kafka apache-spark apache-spark-sql spark-structured-streaming

我正在尝试将 Kafka 集成到我的 Spark 应用程序中,这是我的 POM 文件所需的条目:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.stream.kafka.version}</version>
</dependency>
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>${kafka.version}</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

对应的工件版本是:

<kafka.version>0.10.2.0</kafka.version>
<spark.stream.kafka.version>2.2.0</spark.stream.kafka.version>
Run Code Online (Sandbox Code Playgroud)

我一直摸不着头脑:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
Run Code Online (Sandbox Code Playgroud)

我也尝试为 jar 提供--jars参数,但是它没有帮助。我在这里缺少什么?

代码:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
Run Code Online (Sandbox Code Playgroud)

_spark 定义为:

_spark = SparkSession
                .builder()
                .appName(_properties.getProperty("app.name"))
                .config("spark.master", _properties.getProperty("master"))
                .config("spark.es.nodes", _properties.getProperty("es.hosts"))
                .config("spark.es.port", _properties.getProperty("es.port"))
                .config("spark.es.index.auto.create", "true")
                .config("es.net.http.auth.user", _properties.getProperty("es.net.http.auth.user"))
                .config("es.net.http.auth.pass", _properties.getProperty("es.net.http.auth.pass"))
                .getOrCreate();
Run Code Online (Sandbox Code Playgroud)

我的进口是:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
Run Code Online (Sandbox Code Playgroud)

但是,当我运行此处提到的代码并且使用 package 选项时:

private static void startKafkaConsumerStream() {

        Dataset<HttpPackage> ds1 = _spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
                .option("subscribe", HTTP_FED_VO_TOPIC)
                .load() // Getting the error here
                .as(Encoders.bean(HttpPackage.class));

        ds1.foreach((ForeachFunction<HttpPackage>)  req ->System.out.print(req));

    }
Run Code Online (Sandbox Code Playgroud)

有用

Jac*_*ski 5

Spark Structured Streaming 使用外部kafka-0-10-sql模块支持 Apache Kafka 作为流源和接收器。

kafka-0-10-sql模块不可用于使用 提交执行的 Spark 应用程序spark-submit。该模块是外部的,要使其可用,您应该将其定义为依赖项。

除非您kafka-0-10-sql在 Spark 应用程序中使用特定于模块的代码,否则不必将模块定义为dependencyin pom.xml。您根本不需要该模块的编译依赖项,因为没有代码使用该模块的代码。您可以针对接口进行编码,这也是 Spark SQL 使用起来如此愉快的原因之一(即,只需很少的编码即可拥有相当复杂的分布式应用程序)。

spark-submit但是需要--packages命令行选项,您已报告它工作正常。

但是,当我运行此处提到的代码并且使用 package 选项时:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Run Code Online (Sandbox Code Playgroud)

它工作得很好的原因--packages是你必须告诉 Spark 基础设施在哪里可以找到kafka格式的定义。

这给我们带来了使用 Kafka 运行流式 Spark 应用程序的另一个“问题”(或要求)。您必须指定模块的运行时依赖关系spark-sql-kafka

您可以使用--packages命令行选项(在您spark-submit的 Spark 应用程序之后下载必要的 jar)或创建所谓的 uber-jar(或 fat-jar)来指定运行时依赖项。

这就是pom.xml发挥作用的地方(这就是为什么人们提供帮助并将pom.xml模块作为dependency)。

因此,首先,您必须在 中指定依赖关系pom.xml

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

最后但并非最不重要的一点是,您必须构建一个pom.xml使用Apache Maven Shade Plugin配置的 uber-jar 。

使用 Apache Maven Shade 插件,您可以创建一个 Uber JARkafka ,其中包含 Spark 应用程序 jar 文件内格式工作的所有“基础设施” 。事实上,Uber JAR 将包含所有必要的运行时依赖项,因此您可以spark-submit单独使用 jar(没有--packages选项或类似选项)。