nmx*_*mxl 5 apache-kafka kafka-consumer-api
我按照此处的快速入门指南的说明在本地运行 kafka ,
然后我定义了我的消费者组配置,config/consumer.properties
以便我的消费者可以从定义的group.id
运行以下命令,
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Run Code Online (Sandbox Code Playgroud)
结果是,
test-consumer-group <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh
Run Code Online (Sandbox Code Playgroud)
我能够通过基于 python 的消费者连接到 kafka,该消费者被配置为使用提供group.id
即test-consumer-group
首先,我无法理解 kafka 如何/何时创建消费者组。似乎它conf/consumer.properties
在某个时间点加载了,另外它在console-consumer-67807
通过kafka-console-consumer.sh
.
我怎样才能明确地创建我自己的消费者组,比如说my-created-consumer-group
?
您没有明确创建消费者组,而是构建始终属于消费者组的消费者。无论您使用哪种技术(Spark、Spring、Flink 等),每个 Kafka Consumer 都会有一个 Consumer Group。消费者组可以为每个单独的消费者配置。
它似乎在某个时间点加载了 conf/consumer.properties 并且另外它在通过 kafka-console-consumer.sh 连接时隐式地创建了消费者组(在我的例子中是 console-consumer-67807)
如果您不告诉您的控制台使用者实际使用该文件,则不会考虑该文件。
有以下替代方法可以提供消费者组的名称:
这是文件的config/consumer.properties
样子
# consumer group id
group.id=my-created-consumer-group
Run Code Online (Sandbox Code Playgroud)
这就是您如何确保控制台消费者考虑group.id
到这一点:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config /path/to/config/consumer.properties
Run Code Online (Sandbox Code Playgroud)
对于控制台消费者,消费者组会自动创建,前缀为“console-consumer”,后缀类似于 PID,除非您通过添加--group
以下内容来提供自己的消费者组:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --group my-created-consumer-group
Run Code Online (Sandbox Code Playgroud)
使用标准 JAVA/Scala/... Consumer API 时,您可以通过以下属性提供 Consumer Group:
# consumer group id
group.id=my-created-consumer-group
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
11562 次 |
最近记录: |