如何在Structured Streaming的kafka数据源中为消费者组设置group.id?

Pan*_*tas 9 apache-kafka apache-spark spark-structured-streaming spark-kafka-integration

我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据。这意味着我需要强制使用特定的 group.id。但是,正如文档中所述,这是不可能的。尽管如此,在 databricks 文档https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl 中,它说这是可能的。这是否仅指 azure 集群?

另外,通过查看 apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md的 master 分支的文档,我们可以理解这样的功能旨在在以后的 Spark 版本中添加。你知道这样一个稳定版本的任何计划,这将允许设置消费者 group.id 吗?

如果没有,Spark 2.4.0 是否有任何解决方法可以设置特定的消费者 group.id?

mik*_*ike 7

从 Spark 3.0.0 开始

根据结构化 Kafka 集成指南,您可以提供 ConsumerGroup 作为选项kafka.group.id

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("kafka.group.id", "myConsumerGroup")
  .load()
Run Code Online (Sandbox Code Playgroud)

但是,Spark 不会提交任何偏移量,因此 ConsumerGroups 的偏移量不会存储在 Kafka 的内部主题__consumer_offsets中,而是存储在 Spark 的检查点文件中。

能够设置group.id是为了处理 Kafka 的最新功能授权使用基于角色的访问控制,您的 ConsumerGroup 通常需要遵循命名约定。

此处kafka.group.id讨论并解决了Spark 3.x 应用程序设置的完整示例。