Sri*_*ini 10 apache-kafka kafka-consumer-api
我是kafka的新手并试图了解是否有办法从上次消耗的偏移中读取消息,但不是从头开始.
我正在写一个案例,所以我的意图不会偏离.
Eg:
1) I produced 5 messages at 7:00 PM and console consumer consumed those.
2) I stopped consumer at 7:10 PM
3) I produced 10 message at 7:20 PM. No consumer had read those messages.
4) Now, i have started console consumer at 7:30 PM, without from-beginning.
5) Now, it Will read the messages produced after it has started. Not the earlier ones, which were produced at 7.20 PM
Run Code Online (Sandbox Code Playgroud)
有没有办法从最后消耗的偏移量中获取消息.?
小智 9
我是kafka的新手并试图了解是否有办法从上次消耗的偏移中读取消息,但不是从头开始.
是的,可以使用控制台消费者来读取上次消耗的偏移量.您必须在调用kafka-console-consumer时添加consumer.config标志.
例:-
[root@sandbox bin]# ./kafka-console-consumer.sh --topic test1 --zookeeper localhost:2181 --consumer.config /home/mrnakumar/consumer.properties
Run Code Online (Sandbox Code Playgroud)
这里/home/mrnakumar/consumer.properties是一个包含group.id的文件.以下是/home/mrnakumar/consumer.properties的外观: -
group.id = consoleGroup
使用consumer.config,可以从[开始使用--from-beginning]开始读取,也可以从仅Log 开始读取.日志结束表示消费者启动后发布的所有消息.
在消费者配置中设置auto.offset.reset=earliest, AND a fixedgroup.id=something将在最后提交的偏移量处启动消费者。在您的情况下,它应该在 7:20 的第一条消息开始消费。如果您希望它在启动后开始阅读发布的消息,那么auto.offset.reset=latest它将忽略在 7:20 发送的 10 条消息并读取它启动后传入的任何消息。
如果您希望它从头开始,您必须seekToBeginning在第一个之后调用consumer.poll(),或者将消费者组 ID 更改为唯一的。
| 归档时间: |
|
| 查看次数: |
13895 次 |
| 最近记录: |