Ank*_*kit 14 java apache-kafka kafka-consumer-api
我做了Apache Kafka 0.10.1.0的全新安装.
我能够在命令提示符下发送/接收消息.
使用Producer/Consumer Java示例时,我无法知道Consumer Example上的group.id参数.
让我知道如何解决这个问题.
以下是我用过的消费者示例:
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-topic");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(100);
System.err.println("records size=>"+records.count());
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
catch (Exception ex){
ex.printStackTrace();
}
finally {
consumer.close();
}
}
Run Code Online (Sandbox Code Playgroud)
在为消费者运行命令之后,我可以看到生产者发布的消息(在控制台上).但无法从java程序中看到消息
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 - topic my-topic --from-beginning
小智 5
以下是分区和消费者属性group.id的一些测试结果
Properties props = new Properties();
//set all other properties as required
props.put("group.id", "ConsumerGroup1");
props.put("max.poll.records", "1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
Run Code Online (Sandbox Code Playgroud)
Consumer.group id 用于对生产的数据进行负载均衡(如果每个消费者的group.id不同,则每个消费者将获得数据的副本)
如果分区 = 1 并且消费者总数 = 2,则只有二分之一的活动消费者会获取数据
如果分区=2并且消费者总数=2,则两个活跃消费者中的每一个均匀地获取数据
如果分区 = 3 并且消费者总数 = 2,则两个活跃消费者中的每一个都将获取数据。一个消费者从 2 个分区获取数据,另一个消费者从 1 个分区获取数据。
如果分区 = 3 并且消费者总数 = 3,则三个活跃消费者中的每一个都会均匀地获取数据。