检查Kafka队列是否为空

Jak*_*ake 0 clojure apache-kafka clj-kafka

现在我有将几百条消息写入 kafka 队列的功能。但是当所有这些消息都被消耗掉时,我还需要执行额外的功能。有没有办法在 kafka 队列上放置一个侦听器,以便在它被清空时得到通知?

ser*_*jja 5

你可以通过两种方式解决,我认为:

  1. Kafka 的Fetch Response包含一个HighwaterMarkOffset,它本质上是分区中最后一条消息的偏移量。您可以检查您的消息是否具有该偏移量,如果有 - 您已到达结尾。但是,如果您让生产者和消费者同时工作,这将不起作用 - 消费者可以更快地消费消息,从而比您需要的更早停止。
  2. 发送“毒丸”消息 - 假设您需要生成 100 条消息。然后您的生产者发送这 100 条消息 + 1 条特殊消息(例如一些 UUID,但请确保它在您的逻辑中的正常情况下永远不会出现),这意味着“结束”。在消费者方面,您将检查收到的消息是否是毒丸,如果是则关闭。