gsc*_*441 12 apache-kafka kafka-consumer-api
我想从服务器的主题开始获取所有消息。
前任:
bin/kafka-console-consumer.sh --zookeeper 本地主机:2181 --topic testTopic --from-beginning
使用上述控制台命令时,我希望能够从一开始就获取主题中的所有消息,但我无法使用 java 代码从一开始就使用主题中的所有消息。
最简单的方法是启动一个消费者并清除所有消息。现在我不知道您的主题中有多少个分区以及您是否已经有一个现有的消费者组,但是您有几个选择:
看看这个 API:https : //kafka.apache.org/090/javadoc/index.html? org/ apache/ kafka/ clients/ consumer/ KafkaConsumer.html
1)如果您已经在同一个消费组中有一个消费者,并且仍然想从头开始消费,您应该使用seek
API doc中列出的选项,并将组中每个消费者的偏移量设置为0。这将从一开始就开始消耗。
2)否则,你可以在一个新的消费群中启动几个消费者,你就不用担心seek了。
PS:如果您对Kafka有更多疑问,请记住提供有关您的设置的更多详细信息。很多事情取决于你如何配置你的基础设施以及你希望它如何,因此会因情况而异。
您可以使用以下命令获取所有消息:
cd Users/kv/kafka/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topicName --from-beginning --max-messages 100
Run Code Online (Sandbox Code Playgroud)
TopicPartition topicPartition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Arrays.asList(topicPartition);
consumer.assign(partitions); consumer.seekToBeginning(partitions);
Run Code Online (Sandbox Code Playgroud)