我遇到的情况是,1 个线程正在快速消耗来自 Kafka 主题的消息,并将它们放入阻塞队列中,然后在另一个将批量插入写入 mongo 数据库集合的线程中消耗该消息。我没有看到很多答案,因为这是一个常见问题,我的应用程序正在崩溃,因为消息 q 变得如此之大并且内存不足,因为 mongo db writer 线程无法跟上消息消耗率。
配置kafka消费者暂停消息消费一段时间直到消息q恢复到合理大小的正确方法是什么?我可以在池循环中暂停或执行其他操作吗?我不这么认为,否则消费者将被标记为不在线,我是否可以在每次消息 q 太大时关闭 Kafka 消费者,然后在其恢复到可管理的大小时重新连接?我可以,但这似乎不是一个干净的解决方案,我正在寻找的是说“嘿卡夫卡,请暂停向我的活跃消费者发送消息,直到我告诉你恢复”,因为这将允许我以最快的速度拉取消息我可以将它们插入我的数据存储中。
请帮忙!
小智 2
kafka api中有一个暂停和恢复方法 https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(java.util.Collection)
如果您检查消耗流量控制部分,它会说明以下内容:
Kafka支持动态控制消费流,通过使用pause(Collection)和resume(Collection)来暂停指定分配分区上的消费,并在以后的poll(long)调用中分别恢复指定暂停分区上的消费。
| 归档时间: |
|
| 查看次数: |
5515 次 |
| 最近记录: |