Pri*_*ngh 6 apache-kafka kafka-consumer-api pykafka
我正在使用跨多台机器的通用消费者组管理卡夫卡队列。现在我还需要显示队列的当前内容。如何只读取组内尚未读取的消息,同时使这些消息再次被组中实际处理这些消息的其他消费者读取。任何帮助,将不胜感激。
在 Kafka 中,从主题“读取”消息和“消费”消息的概念是同一回事。在较高级别上,使“已消费”消息对消费者不可用的唯一原因是消费者将其读取偏移量设置为超出相关消息的值。因此,您可以关闭使用者的自动提交功能,并避免在您只想“读取”而不是“使用”的情况下提交偏移量。
获取“所有尚未读取的消息”的一个很好的代理是将最新提交的偏移量与每个分区的高水位标记偏移量进行比较。这提供了“滞后”的概念,指示给定消费者在其分区的消费中落后了多远。pykafka 中的 CLI函数fetch_consumer_lag是如何执行此操作的一个很好的示例。
| 归档时间: |
|
| 查看次数: |
18624 次 |
| 最近记录: |