如何在kafka中创建一个新的消费者组

nmx*_*mxl 5 apache-kafka kafka-consumer-api

我按照此处的快速入门指南的说明在本地运行 kafka ,

然后我定义了我的消费者组配置,config/consumer.properties以便我的消费者可以从定义的group.id

运行以下命令,

bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Run Code Online (Sandbox Code Playgroud)

结果是,

test-consumer-group  <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh
Run Code Online (Sandbox Code Playgroud)

我能够通过基于 python 的消费者连接到 kafka,该消费者被配置为使用提供group.idtest-consumer-group

首先,我无法理解 kafka 如何/何时创建消费者组。似乎它conf/consumer.properties在某个时间点加载了,另外它在console-consumer-67807通过kafka-console-consumer.sh.

我怎样才能明确地创建我自己的消费者组,比如说my-created-consumer-group

mik*_*ike 6

您没有明确创建消费者组,而是构建始终属于消费者组的消费者。无论您使用哪种技术(Spark、Spring、Flink 等),每个 Kafka Consumer 都会有一个 Consumer Group。消费者组可以为每个单独的消费者配置。

它似乎在某个时间点加载了 conf/consumer.properties 并且另外它在通过 kafka-console-consumer.sh 连接时隐式地创建了消费者组(在我的例子中是 console-consumer-67807)

如果您不告诉您的控制台使用者实际使用该文件,则不会考虑该文件。

有以下替代方法可以提供消费者组的名称:

带有属性文件的控制台消费者 (--consumer.config)

这是文件的config/consumer.properties样子

# consumer group id
group.id=my-created-consumer-group
Run Code Online (Sandbox Code Playgroud)

这就是您如何确保控制台消费者考虑group.id到这一点:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config /path/to/config/consumer.properties
Run Code Online (Sandbox Code Playgroud)

带 --group 的控制台消费者

对于控制台消费者,消费者组会自动创建,前缀为“console-consumer”,后缀类似于 PID,除非您通过添加--group以下内容来提供自己的消费者组:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --group my-created-consumer-group
Run Code Online (Sandbox Code Playgroud)

基于标准代码的消费者 API

使用标准 JAVA/Scala/... Consumer API 时,您可以通过以下属性提供 Consumer Group:

# consumer group id
group.id=my-created-consumer-group
Run Code Online (Sandbox Code Playgroud)

  • 是的,是的:)如果具有随机group.id的消费者第一次消费消息,Kafka会检查该消费者组是否已经存在。如果该消费者组之前不存在,则消费者将从头开始阅读该主题(除非另有说明)。 (2认同)