Max*_*aga 3 java apache-kafka kafka-consumer-api
目标:读取来自主题的所有消息,然后终止进程。
我能够连续阅读以下消息:
props.put("bootstrap.servers", kafkaBootstrapSrv);
props.put("group.id", group_id);
props.put("max.poll.records", 1); // Only get one record at a time. I understand that to read all messages this will need to be increased
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("MY_TOPIC"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
process_record(record);
}
consumer.commitSync();
}
Run Code Online (Sandbox Code Playgroud)
但在这种情况下,进程永远不会终止。当我摆脱
while (true)
Run Code Online (Sandbox Code Playgroud)
循环并运行程序,它不会从主题中获取一条记录(我希望有一条记录)。这是为什么?
Kafka 主题基本上实现了无限的事件流。
那么从主题消费时什么时候停止?你怎么知道你走到了尽头?简短的回答是你没有!理论上,生产者总是可以向主题发送新消息。
在实践中,假设没有/很少有新记录被附加,你可以做一些事情来结束。
使用 endOffsets()您可以找到分区的当前最后一个偏移量。一旦消费者达到它分配到的所有分区的偏移量,您就可以停止轮询(或刷新它并查看是否已发送新消息)。
您可以使用该position()方法检索每个分区中的当前位置。消费时,每条记录还通过offset(). 因此,您可以使用这些来跟踪端部偏移的进度。
关于您poll()第一次调用时不返回任何内容的第二个问题。这是预期的,因为基本上poll()使客户端工作,并且在第一次调用时,它将启动与集群的连接并启动组协议(这需要几秒钟),因此在poll()返回之前不太可能已经收到消息。
| 归档时间: |
|
| 查看次数: |
3701 次 |
| 最近记录: |