Spark 结构化流 未授权访问组

Tom*_*loň 2 scala apache-kafka apache-spark spark-streaming spark-structured-streaming

我正在尝试通过 Spark 结构化流从 Kafka 读取数据。但是,在Spark 2.4.0中,您无法为流设置group id(请参阅如何在Structured Streaming中为kafka数据源中的消费者组设置group.id?)。

然而,由于没有设置,spark 只是生成组 Id,而我陷入了 GroupAuthorizationException:

19/12/10 15:15:00 ERROR streaming.MicroBatchExecution: Query [id = 747090ff-120f-4a6d-b20e-634eb77ac7b8, runId = 63aa4cce-ad72-47f2-80f6-e87947b69685] terminated with error
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-d2420426-13d5-4bda-ad21-7d8e43ebf518-1874352823-driver-2
Run Code Online (Sandbox Code Playgroud)

请问有什么想法可以绕过这个吗?有趣的是,我可以通过 kafka-console-consumer.sh 读取这些数据,我可以在 .properties 文件中传递组 ID。

抛出异常的代码:

val df = spark
  .readStream
  .format("kafka")
  .option("subscribe", "topic")
  .option("startingOffsets", "earliest")
  .option("kafka.group.id", "idThatShouldBeUsed")
  .option("kafka.bootstrap.servers", "server")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.ssl.truststore.location", "/location)
  .option("kafka.ssl.truststore.password", "pass")
  .option("kafka.sasl.jaas.config", """jaasToUse""")
  .load()
  .writeStream
  .outputMode("append")
  .format("console")
  .option("startingOffsets", "earliest")
  .start().awaitTermination()
Run Code Online (Sandbox Code Playgroud)

Tom*_*loň 5

从消费者的角度来看,这似乎是无法解决的。我们最终不得不使用 bin/kafka-acls.sh 和通配符来允许结构化流生成的所有组 ID。

卡夫卡acl示例:

bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zk:2181 --add --allow-principal User:'Bon' --operation READ --topic topicName --group='spark-kafka-source-' --resource-pattern-type prefixed
Run Code Online (Sandbox Code Playgroud)