从特定主题中检索 Kafka 消费者的最后 n 条消息

pra*_*mar 2 java apache-kafka kafka-consumer-api kafka-records

卡夫卡版本:0.9.0.1

如果n = 20,我必须获取某个主题的最后 20 条消息。

我试过

kafkaConsumer.seekToBeginning();
Run Code Online (Sandbox Code Playgroud)

但它检索所有消息。我只需要获取最后 20 条消息。

这个话题可能有几十万条记录

public List<JSONObject> consumeMessages(String kafkaTopicName) {
  KafkaConsumer<String, String> kafkaConsumer = null;
  boolean flag = true;
  List<JSONObject> messagesFromKafka = new ArrayList<>();
  int recordCount = 0;
  int i = 0;
  int maxMessagesToReturn = 20;

  Properties props = new Properties();         
  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "project.group.id");
  props.put("max.partition.fetch.bytes", "1048576000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  kafkaConsumer = new KafkaConsumer<>(props);

  kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
  TopicPartition topicPartition = new TopicPartition(kafkaTopicName, 0);
  LOGGER.info("Subscribed to topic " + kafkaConsumer.listTopics());
  while (flag) {
    // will consume all the messages and store in records
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    kafkaConsumer.seekToBeginning(topicPartition);

    // getting total records count
    recordCount = records.count();
    LOGGER.info("recordCount " + recordCount);
    for (ConsumerRecord<String, String> record : records) {
      if(record.value() != null) {
        if (i >= recordCount - maxMessagesToReturn) {
          // adding last 20 messages to messagesFromKafka
          LOGGER.info("kafkaMessage "+record.value());
          messagesFromKafka.add(new JSONObject(record.value()));
        }
        i++;
      }
    }
    if (recordCount > 0) {
      flag = false;
    }
  }
  kafkaConsumer.close();
  return messagesFromKafka;
}
Run Code Online (Sandbox Code Playgroud)

Thi*_*ruG 5

您可以使用kafkaConsumer.seekToEnd(Collection<TopicPartition> partitions)来寻找给定分区的最后一个偏移量。根据文档:

"寻找每个给定分区的最后一个偏移量。这个函数延迟评估,只在调用poll(Duration)或被position(TopicPartition)调用时寻找所有分区中的最终偏移量。如果没有提供分区,则寻找所有当前分配的分区的最终偏移量.”

然后,您可以使用 检索特定分区的位置position(TopicPartition partition)

然后您可以从中减少 20,并用于kafkaConsumer.seek(TopicPartition partition, long offset)获取最近的 20 条消息。

简单地,

kafkaConsumer.seekToEnd(partitionList);
long endPosition = kafkaConsumer.position(topicPartiton);
long recentMessagesStartPosition = endPosition - maxMessagesToReturn;
kafkaConsumer.seek(topicPartition, recentMessagesStartPosition);
Run Code Online (Sandbox Code Playgroud)

现在您可以使用检索最近的 20 条消息 poll()

这是简单的逻辑,但如果您有多个分区,则还必须考虑这些情况。我没有尝试这个,但希望你能理解这个概念。