Vis*_*uli 5 java apache-kafka spring-boot kafka-consumer-api
我在我的框架中使用 Kafka 生产者-消费者模型。消费者端消费的记录随后被索引到elasticsearch上。这里我有一个用例,如果 ES 关闭,我将不得不暂停 kafka 消费者,直到 ES 启动,一旦启动,我需要恢复消费者并使用我上次离开的位置的记录。我认为@KafkaListener 无法实现这一点。谁能给我一个解决方案吗?我发现我需要为此编写自己的 KafkaListenerContainer,但我无法正确实现它。任何帮助将非常感激。
有多种可能的解决方案,一种简单的方法是使用 KafkaConsumer API。在 KafkaConsumer 实现中,跟踪主题的位置,该位置将在下次调用 poll(...) 时检索。您的问题是从 Kafka 获取记录后,您可能无法将其插入到 Elastic Search 中。在这种情况下,您必须编写一个例程来重置消费者的位置,在您的情况下将是consumer.seek(partition,consumer.position(partition)-1)。这会将位置重置为较早的位置。此时,一个好的方法是暂停分区(这将使服务器能够进行一些资源清理),然后轮询 ES(通过您想要的任何机制)。一旦 ES 可用,请在消费者上调用resume并继续通常的轮询插入周期。
讨论后编辑
创建一个具有指定生命周期方法的 spring bean。在bean的初始化方法中实例化您的KafkaConsumer(从任何源检索消费者的配置)。从该方法启动一个线程与consumer交互并更新ES,其余设计如上。这是单线程模型。为了获得更高的吞吐量,请考虑将从 Kafka 检索到的数据保留在较小的内存队列中,并使用调度程序线程来获取消息并将其提供给池化线程以更新 ES。
| 归档时间: |
|
| 查看次数: |
19350 次 |
| 最近记录: |