use*_*283 7 java-api consumer apache-kafka
我正在使用Kafka站点的ConsumerGroupExample代码测试Kafka High Level Consumer.我想检索我在Kafka服务器配置中名为"test"的主题的所有现有消息.查看其他博客,auto.offset.reset应设置为"最小",以便能够获取所有消息:
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
Run Code Online (Sandbox Code Playgroud)
我真正拥有的问题是:高级消费者的等效Java api调用相当于:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 - topic test - from-beginning
基本上,每当新消费者尝试使用主题时,它都会从头开始阅读消息.如果您每次只是为了测试目的而从头开始消费,那么每次使用新的groupID初始化您的消费者时,它都会从头开始阅读消息.我是这样做的:
properties.put("group.id", UUID.randomUUID().toString());
Run Code Online (Sandbox Code Playgroud)
并且每次从头开始阅读消息!
看起来你需要使用"低级SimpleConsumer API"
对于大多数应用程序,高级别的消费者Api足够好.某些应用程序还需要尚未向高级消费者公开的功能(例如,在重新启动消费者时设置初始偏移).他们可以使用我们的低级SimpleConsumer Api.逻辑会有点复杂,你可以按照这里的例子.
此示例用于从具有以下参数的主题获取所有消息:(请注意,端口是Kafka端口,而不是ZooKeeper端口,从此示例设置的主题):
10 my-replicated-topic 0 localhost 9092
Run Code Online (Sandbox Code Playgroud)
具体来说,有一个获取readOffset的方法,它采用kafka.api.OffsetRequest.EarliestTime():
long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
Run Code Online (Sandbox Code Playgroud)
这是另一篇文章可能提供一些关于如何解决这个问题的替代想法:如何从卡夫卡的旧偏移点获取数据?