Spring Kafka - 为任何主题的分区消耗最后 N 条消息

use*_*814 5 java spring apache-kafka apache-kafka-streams spring-kafka

我正在尝试读取请求的 kafka 消息数。对于非事务性消息,我们将从 endoffset 中寻找 - M 个分区的 N 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息。对于幂等/事务消息,我们必须考虑事务标记/重复消息,这意味着偏移量将不连续,在这种情况下,endoffset - N 不会返回 N 条消息,我们需要返回并寻找更多消息,直到我们有 N 条消息对于每个分区或达到起始偏移量

由于有多个分区,我需要跟踪读取的所有偏移量,以便在完成所有操作后停止。有两个步骤,第一步计算起始偏移量(结束偏移量 - 请求的消息数)和结束偏移量。(偏移量不连续,存在间隙),我会寻找分区从起始偏移量开始消耗。第二步是轮询消息并计算每个分区中的消息数,如果我们没有满足请求的消息数,则再次重复第一步和第二步,直到我们满足每个分区的消息数。

状况

初始轮询可能不会返回任何记录,因此继续轮询。当您达到每个分区的结束偏移或轮询不返回结果时停止轮询。检查每个分区读取的消息与请求的消息相同。如果是标记为完成,如果没有标记为继续并重复步骤。考虑消息中的差距。应该适用于事务性和非事务性生产者。

题:

我将如何跟踪每个分区已读取的所有消息并跳出循环?如果有帮助,每个分区中的消息将按顺序排列。

spring kafka 是否支持这样的用例?可以在此处找到更多详细信息

更新:我要求读取每个分区中的最后 N 条消息。分区和没有消息是用户输入。我想将所有偏移管理保留在内存中。本质上,我们试图以 LIFO 顺序读取消息。这使它变得棘手,因为 Kafka 允许您向前阅读而不是向后阅读。

Muk*_*sal 0

为什么有这样的需要,我不明白。当队列中没有任何内容时,Kafka 本身会进行管理。如果消息从一个状态跳转到另一个状态,则可以有单独的队列/主题。然而,这里是如何做到这一点的。

当我们使用来自分区的消息时,例如 -

ConsumerIterator<byte[], byte[]> it = something; //initialize consumer
while (it.hasNext()) {
  MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
  String kafkaMessage = new String(messageAndMetadata.message());
  int partition = messageAndMetadata.partition();
  long offset = messageAndMetadata.offset();
  boolean processed = false;
  do{
    long maxOffset = something; //fetch from db
    //if offset<maxOffset, then process messages and manual commit
    //else busy wait or something more useful
  }while(processed);
}
Run Code Online (Sandbox Code Playgroud)

我们获得有关偏移量、分区号和消息本身的信息。您可以选择使用此信息执行任何操作。

对于您的用例,您可能还决定将消耗的偏移量保存到数据库中,以便下次可以调整偏移量。另外,我建议关闭连接进行清理,并最终将处理后的偏移量保存到数据库。