Kafka高级消费者使用Java API从主题获取所有消息(相当于 - 从头开始​​)

use*_*283 7 java-api consumer apache-kafka

我正在使用Kafka站点的ConsumerGroupExample代码测试Kafka High Level Consumer.我想检索我在Kafka服务器配置中名为"test"的主题的所有现有消息.查看其他博客,auto.offset.reset应设置为"最小",以便能够获取所有消息:

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId)    {
    Properties props = new Properties();
    props.put("zookeeper.connect", a_zookeeper);
    props.put("group.id", a_groupId);
    props.put("auto.offset.reset", "smallest");
    props.put("zookeeper.session.timeout.ms", "10000");     

    return new ConsumerConfig(props);
}
Run Code Online (Sandbox Code Playgroud)

我真正拥有的问题是:高级消费者的等效Java api调用相当于:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 - topic test - from-beginning

Kar*_*ati 7

基本上,每当新消费者尝试使用主题时,它都会从头开始阅读消息.如果您每次只是为了测试目的而从头开始消费,那么每次使用新的groupID初始化您的消费者时,它都会从头开始阅读消息.我是这样做的:

properties.put("group.id", UUID.randomUUID().toString());
Run Code Online (Sandbox Code Playgroud)

并且每次从头开始阅读消息!


phe*_*ris 5

看起来你需要使用"低级SimpleConsumer API"

对于大多数应用程序,高级别的消费者Api足够好.某些应用程序还需要尚未向高级消费者公开的功能(例如,在重新启动消费者时设置初始偏移).他们可以使用我们的低级SimpleConsumer Api.逻辑会有点复杂,你可以按照这里的例子.

此示例用于从具有以下参数的主题获取所有消息:(请注意,端口是Kafka端口,而不是ZooKeeper端口,从此示例设置的主题):

10 my-replicated-topic 0 localhost 9092
Run Code Online (Sandbox Code Playgroud)

具体来说,有一个获取readOffset的方法,它采用kafka.api.OffsetRequest.EarliestTime():

long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
Run Code Online (Sandbox Code Playgroud)

这是另一篇文章可能提供一些关于如何解决这个问题的替代想法:如何从卡夫卡的旧偏移点获取数据?