mak*_*dos 2 apache-kafka kafka-consumer-api
我有一台服务器,需要保留所有用户的内存缓存。因此,假设列表不会很大-数十万个项目,我想使用带有密钥消息的Kafka主题,其中key是userId,以保持该列表的当前状态,并且管理应用程序将发送新的用户对象当事情发生变化时,就可以讨论该主题。因此,当服务器启动时,它只需要从头开始读取该主题的所有内容,并填充其缓存。
填充阶段大约需要20到30秒,具体取决于与Kafka的连接,因此服务器无需等待联机,直到它从该主题读取所有内容以拥有最新的缓存(开始时该主题中的所有消息)被认为是最新的)。但是我看不到如何确定是否从Kafka流中读取所有内容以通知其他服务缓存已填充并且服务器可以启动服务器请求。我已经读过有关高水印的内容,但是看不到它在Java消费者API中公开。
那么,如何找出Kafka主题的最新偏移量,以了解我的读者何时是最新的?
小智 5
假设您正在使用高级消费者。
高水位标记在高级消费者中不可用。
**As you mentioned: all the messages in the topic at the moment of start is considered up-to-date**
Run Code Online (Sandbox Code Playgroud)
当您的应用程序启动时,您可以使用SimpleConsumer Api执行以下操作:
通过向kafka集群中的任何代理发出TopicMetadataRequest来找到主题中的分区数。
将分区创建为latestOffset映射,其中键为分区,值为该分区中可用的latestOffset。
Map <Integer,Integer> offsetMap =新的HashMap <>()
对于主题中的每个分区p:
A.找到分区p的头
B.向主持人发送OffsetRequest
C.从OffsetResponse获取最新的Offset
D.在offsetMap中添加一个项,其中键是分区p,offset是latestOffset。
开始使用高级消费者从kafka读取消息:
答:对于从KafkaStream获得的每条消息:
AA. Get the partition && offset of the message
BB. if( offsetMap.get(partition)<=offset) stop Reading from this steam
Run Code Online (Sandbox Code Playgroud)希望这可以帮助。
| 归档时间: |
|
| 查看次数: |
4163 次 |
| 最近记录: |