了解消费者群体ID

Ank*_*kit 14 java apache-kafka kafka-consumer-api

我做了Apache Kafka 0.10.1.0的全新安装.

我能够在命令提示符下发送/接收消息.

使用Producer/Consumer Java示例时,我无法知道Consumer Example上的group.id参数.

让我知道如何解决这个问题.

以下是我用过的消费者示例:

public static void main(String[] args) {
             Properties props = new Properties();
             props.put("bootstrap.servers", "localhost:9092");
             props.put("group.id", "my-topic");
             props.put("enable.auto.commit", "true");
             props.put("auto.commit.interval.ms", "1000");
             props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
             try {
                 consumer.subscribe(Arrays.asList("my-topic"));

                     ConsumerRecords<String, String> records = consumer.poll(100);
                     System.err.println("records size=>"+records.count());
                     for (ConsumerRecord<String, String> record : records) 
                         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());



              }
             catch (Exception ex){
                 ex.printStackTrace();
             }
            finally {
                 consumer.close();
            }
        }
Run Code Online (Sandbox Code Playgroud)

在为消费者运行命令之后,我可以看到生产者发布的消息(在控制台上).但无法从java程序中看到消息

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 - topic my-topic --from-beginning

Raz*_*ssi 22

消费者使用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例.消费者实例可以在单独的进程中,也可以在不同的机器

如果所有使用者实例具有相同的使用者组,则记录将有效地在使用者实例上进行负载平衡.

如果所有消费者实例具有不同的消费者组,则每个记录将广播到所有消费者进程.

group.id是一个字符串,用于唯一标识此使用者所属的使用者进程组.

(卡夫卡介绍)

  • 您可以使用任何字符串,如果要运行两个使用相同字符串的使用者,它们将位于同一组中。 (2认同)

小智 5

以下是分区和消费者属性group.id的一些测试结果

 Properties props = new Properties();
  //set all other properties as required
  props.put("group.id", "ConsumerGroup1");
  props.put("max.poll.records", "1");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
Run Code Online (Sandbox Code Playgroud)

Consumer.group id 用于对生产的数据进行负载均衡(如果每个消费者的group.id不同,则每个消费者将获得数据的副本)

如果分区 = 1 并且消费者总数 = 2,则只有二分之一的活动消费者会获取数据

如果分区=2并且消费者总数=2,则两个活跃消费者中的每一个均匀地获取数据

如果分区 = 3 并且消费者总数 = 2,则两个活跃消费者中的每一个都将获取数据。一个消费者从 2 个分区获取数据,另一个消费者从 1 个分区获取数据。

如果分区 = 3 并且消费者总数 = 3,则三个活跃消费者中的每一个都会均匀地获取数据。