无法找到 LoginModule 类:org.apache.kafka.common.security.plain.PlainLoginModule

Spa*_*r0i 10 apache-kafka apache-spark spark-structured-streaming spark-streaming-kafka

环境:Spark 2.3.0、Scala 2.11.12、Kafka(无论最新版本是什么)

\n\n

我有一个安全的 Kafka 系统,我正在尝试将 Spark Streaming Consumer 连接到该系统。以下是我的build.sbt文件:

\n\n
name := "kafka-streaming"\nversion := "1.0"\n\nscalaVersion := "2.11.12"\n\n// still want to be able to run in sbt\n// https://github.com/sbt/sbt-assembly#-provided-configuration\nrun in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))\n\nfork in run := true\njavaOptions in run ++= Seq(\n    "-Dlog4j.debug=true",\n    "-Dlog4j.configuration=log4j.properties")\n\nassemblyMergeStrategy in assembly := {\n    case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat\n    case PathList("META-INF", _*) => MergeStrategy.discard\n    case _ => MergeStrategy.first\n}\n\nlibraryDependencies ++= Seq(\n    "org.apache.spark" %% "spark-core" % "2.3.0",\n    "org.apache.spark" %% "spark-sql" % "2.3.0",\n    "org.apache.spark" %% "spark-streaming" % "2.3.0",\n    "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0",\n    "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0",\n    "com.ibm.db2.jcc" % "db2jcc" % "db2jcc4"\n)\n
Run Code Online (Sandbox Code Playgroud)\n\n

请注意,这是 Spark 2.3.0,我无法更改我的 Spark 版本。

\n\n

现在这是我尝试将 Spark Streaming 消费者连接到我的安全 Kafka 的代码部分:

\n\n
name := "kafka-streaming"\nversion := "1.0"\n\nscalaVersion := "2.11.12"\n\n// still want to be able to run in sbt\n// https://github.com/sbt/sbt-assembly#-provided-configuration\nrun in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))\n\nfork in run := true\njavaOptions in run ++= Seq(\n    "-Dlog4j.debug=true",\n    "-Dlog4j.configuration=log4j.properties")\n\nassemblyMergeStrategy in assembly := {\n    case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat\n    case PathList("META-INF", _*) => MergeStrategy.discard\n    case _ => MergeStrategy.first\n}\n\nlibraryDependencies ++= Seq(\n    "org.apache.spark" %% "spark-core" % "2.3.0",\n    "org.apache.spark" %% "spark-sql" % "2.3.0",\n    "org.apache.spark" %% "spark-streaming" % "2.3.0",\n    "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0",\n    "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0",\n    "com.ibm.db2.jcc" % "db2jcc" % "db2jcc4"\n)\n
Run Code Online (Sandbox Code Playgroud)\n\n

当我尝试运行该程序时,抛出以下错误:

\n\n
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer\n    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)\n    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)\n    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)\n    at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)\n    at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)\n    at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)\n    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)\n    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)\n    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)\n    >> at com.ibm.kafkasparkintegration.executables.WeatherDataStream$.getRawDataFrame(WeatherDataStream.scala:74)\n    at com.ibm.kafkasparkintegration.executables.WeatherDataStream$.main(WeatherDataStream.scala:24)\n    at com.ibm.kafkasparkintegration.executables.WeatherDataStream.main(WeatherDataStream.scala)\nCaused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: \xc2\xa0org.apache.kafka.common.security.plain.PlainLoginModule\n    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)\n    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)\n    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)\n    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)\n    ... 11 more\nCaused by: javax.security.auth.login.LoginException: unable to find LoginModule class: \xc2\xa0org.apache.kafka.common.security.plain.PlainLoginModule\n    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)\n    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)\n    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)\n    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)\n    at java.security.AccessController.doPrivileged(Native Method)\n    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)\n    at javax.security.auth.login.LoginContext.login(LoginContext.java:587)\n    at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:69)\n    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)\n    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)\n    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)\n    ... 14 more\n
Run Code Online (Sandbox Code Playgroud)\n\n

>>错误日志中的 指向上面load()的代码片段。几天来我一直在尝试解决这个问题,但还没有取得太大成功。

\n

MrE*_*ant 1

我遇到了同样的错误,解决方案是更改 jaas.config 中的类:

Confluence 文档+ Azure 表示:

kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule
Run Code Online (Sandbox Code Playgroud)

然而,使用 Confluence 云正确的是 org.apache.kafka...

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers.mkString(","))
  .option("kafka.security.protocol", "SASL_SSL")
  .option(
    "kafka.sasl.jaas.config",
    s"""org.apache.kafka.common.security.plain.PlainLoginModule required username="${confluentApiKey: String}" password="${confluentSecret: String}";"""
  )
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("subscribe", confluentTopicName)
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
Run Code Online (Sandbox Code Playgroud)