Pan*_*tas 9 apache-kafka apache-spark spark-structured-streaming spark-kafka-integration
我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据。这意味着我需要强制使用特定的 group.id。但是,正如文档中所述,这是不可能的。尽管如此,在 databricks 文档https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl 中,它说这是可能的。这是否仅指 azure 集群?
另外,通过查看 apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md的 master 分支的文档,我们可以理解这样的功能旨在在以后的 Spark 版本中添加。你知道这样一个稳定版本的任何计划,这将允许设置消费者 group.id 吗?
如果没有,Spark 2.4.0 是否有任何解决方法可以设置特定的消费者 group.id?
根据结构化 Kafka 集成指南,您可以提供 ConsumerGroup 作为选项kafka.group.id:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("kafka.group.id", "myConsumerGroup")
.load()
Run Code Online (Sandbox Code Playgroud)
但是,Spark 不会提交任何偏移量,因此 ConsumerGroups 的偏移量不会存储在 Kafka 的内部主题__consumer_offsets中,而是存储在 Spark 的检查点文件中。
能够设置group.id是为了处理 Kafka 的最新功能授权使用基于角色的访问控制,您的 ConsumerGroup 通常需要遵循命名约定。
此处kafka.group.id讨论并解决了Spark 3.x 应用程序设置的完整示例。
目前 (v2.4.0) 这是不可能的。
您可以在 Apache Spark 项目中检查以下几行:
https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L534 -在用于创建的属性中设置它KafkaConsumer
在 master 分支中,您可以找到修改,可以设置前缀或特定的group.id
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L83 -生成组.id 基于组前缀 ( groupidprefix)
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L543 -之前设置生成的 groupId,如果kafka.group.id没有在属性中传递
| 归档时间: |
|
| 查看次数: |
5504 次 |
| 最近记录: |