每次运行消费者jar时,请任何人都可以告诉我如何使用Kafka Consumer API从一开始就阅读消息.
我对使用kafka主题的代码进行了一些JUnit测试.我试过的模拟kafka主题不起作用,在网上找到的例子很老,所以它们也不适用于0.8.2.1.如何使用0.8.2.1创建模拟kafka主题?
澄清:我选择使用该主题的实际嵌入实例,以便使用真实实例进行测试,而不是在mockito中模拟手.这样我可以测试我的自定义编码器和解码器实际工作,并且当我使用真正的kafka实例时它不会失败.
我正在使用Kafka站点的ConsumerGroupExample代码测试Kafka High Level Consumer.我想检索我在Kafka服务器配置中名为"test"的主题的所有现有消息.查看其他博客,auto.offset.reset应设置为"最小",以便能够获取所有消息:
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
Run Code Online (Sandbox Code Playgroud)
我真正拥有的问题是:高级消费者的等效Java api调用相当于:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 - topic test - from-beginning