如何让kafka消费者从上次消费的偏移中读取,而不是从一开始就读取

Sri*_*ini 10 apache-kafka kafka-consumer-api

我是kafka的新手并试图了解是否有办法从上次消耗的偏移中读取消息,但不是从头开始.

我正在写一个案例,所以我的意图不会偏离.

Eg:
1) I produced 5 messages at 7:00 PM and console consumer consumed those.
2) I stopped consumer at 7:10 PM
3) I produced 10 message at 7:20 PM. No consumer had read those messages.
4) Now, i have started console consumer at 7:30 PM, without from-beginning.
5) Now, it Will read the messages produced after it has started. Not the earlier ones, which were produced at 7.20 PM
Run Code Online (Sandbox Code Playgroud)

有没有办法从最后消耗的偏移量中获取消息.?

小智 9

我是kafka的新手并试图了解是否有办法从上次消耗的偏移中读取消息,但不是从头开始.

是的,可以使用控制台消费者来读取上次消耗的偏移量.您必须在调用kafka-console-consumer时添加consumer.config标志.

例:-

[root@sandbox bin]# ./kafka-console-consumer.sh --topic test1 --zookeeper localhost:2181 --consumer.config /home/mrnakumar/consumer.properties
Run Code Online (Sandbox Code Playgroud)

这里/home/mrnakumar/consumer.properties是一个包含group.id的文件.以下是/home/mrnakumar/consumer.properties的外观: -

group.id = consoleGroup

使用consumer.config,可以从[开始使用--from-beginning]开始读取,也可以从仅Log 开始读取.日志结束表示消费者启动后发布的所有消息.

  • 是的,如果我们给出任何组 ID,那么就会从最后一个消费点读取数据。如果我们在没有组 ID 的情况下运行,它只会考虑启动后的数据..谢谢.. (2认同)

Geo*_*ith 7

在消费者配置中设置auto.offset.reset=earliest, AND a fixedgroup.id=something将在最后提交的偏移量处启动消费者。在您的情况下,它应该在 7:20 的第一条消息开始消费。如果您希望它在启动后开始阅读发布的消息,那么auto.offset.reset=latest它将忽略在 7:20 发送的 10 条消息并读取它启动后传入的任何消息。

如果您希望它从头开始,您必须seekToBeginning在第一个之后调用consumer.poll(),或者将消费者组 ID 更改为唯一的。


cod*_*tsu 5

您应该auto.offset.reset在消费者配置中将参数设置为 on largest,以便它将读取最后提交的偏移量之后的所有消息。