use*_*814 5 java spring apache-kafka apache-kafka-streams spring-kafka
我正在尝试读取请求的 kafka 消息数。对于非事务性消息,我们将从 endoffset 中寻找 - M 个分区的 N 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息。对于幂等/事务消息,我们必须考虑事务标记/重复消息,这意味着偏移量将不连续,在这种情况下,endoffset - N 不会返回 N 条消息,我们需要返回并寻找更多消息,直到我们有 N 条消息对于每个分区或达到起始偏移量
由于有多个分区,我需要跟踪读取的所有偏移量,以便在完成所有操作后停止。有两个步骤,第一步计算起始偏移量(结束偏移量 - 请求的消息数)和结束偏移量。(偏移量不连续,存在间隙),我会寻找分区从起始偏移量开始消耗。第二步是轮询消息并计算每个分区中的消息数,如果我们没有满足请求的消息数,则再次重复第一步和第二步,直到我们满足每个分区的消息数。
状况
初始轮询可能不会返回任何记录,因此继续轮询。当您达到每个分区的结束偏移或轮询不返回结果时停止轮询。检查每个分区读取的消息与请求的消息相同。如果是标记为完成,如果没有标记为继续并重复步骤。考虑消息中的差距。应该适用于事务性和非事务性生产者。
题:
我将如何跟踪每个分区已读取的所有消息并跳出循环?如果有帮助,每个分区中的消息将按顺序排列。
spring kafka 是否支持这样的用例?可以在此处找到更多详细信息
更新:我要求读取每个分区中的最后 N 条消息。分区和没有消息是用户输入。我想将所有偏移管理保留在内存中。本质上,我们试图以 LIFO 顺序读取消息。这使它变得棘手,因为 Kafka 允许您向前阅读而不是向后阅读。
为什么有这样的需要,我不明白。当队列中没有任何内容时,Kafka 本身会进行管理。如果消息从一个状态跳转到另一个状态,则可以有单独的队列/主题。然而,这里是如何做到这一点的。
当我们使用来自分区的消息时,例如 -
ConsumerIterator<byte[], byte[]> it = something; //initialize consumer
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
String kafkaMessage = new String(messageAndMetadata.message());
int partition = messageAndMetadata.partition();
long offset = messageAndMetadata.offset();
boolean processed = false;
do{
long maxOffset = something; //fetch from db
//if offset<maxOffset, then process messages and manual commit
//else busy wait or something more useful
}while(processed);
}
Run Code Online (Sandbox Code Playgroud)
我们获得有关偏移量、分区号和消息本身的信息。您可以选择使用此信息执行任何操作。
对于您的用例,您可能还决定将消耗的偏移量保存到数据库中,以便下次可以调整偏移量。另外,我建议关闭连接进行清理,并最终将处理后的偏移量保存到数据库。
| 归档时间: |
|
| 查看次数: |
1190 次 |
| 最近记录: |