Kan*_*nan 5 apache-kafka kafka-consumer-api
我需要从某个主题读取消息,对它们进行批处理并将批处理推送到外部系统。如果批次因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批次,每个分区的起始和终止偏移量都存储在数据库中。为了实现这一目标,我通过将分区分配给读取器来为每个分区创建一个 Kafka 消费者,根据之前存储的偏移量,消费者寻找该位置并开始读取。我已经关闭了自动提交,并且不提交来自消费者的偏移量。对于每个批次,我为每个分区创建一个新的使用者,从存储的最后一个偏移量读取消息并将其发布到外部系统。您是否发现在不提交偏移量的情况下消费消息以及跨批次使用相同的消费者组但在任何时候每个分区不会有多个消费者的任何问题?
你的设计对我来说似乎很合理。
向 Kafka 提交偏移量只是 Kafka 中用于跟踪偏移量的一种方便的内置机制。但是,没有任何要求使用它 - 您也可以使用任何其他机制来跟踪偏移量(例如在您的情况下使用数据库)。
而且,如果你手动分配分区,无论如何也不会有组管理。所以参数group.id
没有影响。有关更多详细信息,请参阅http://docs.confluence.io/current/clients/consumer.html 。
归档时间: |
|
查看次数: |
7020 次 |
最近记录: |