标签: kafka-records

从特定主题中检索 Kafka 消费者的最后 n 条消息

卡夫卡版本:0.9.0.1

如果n = 20,我必须获取某个主题的最后 20 条消息。

我试过

kafkaConsumer.seekToBeginning();
Run Code Online (Sandbox Code Playgroud)

但它检索所有消息。我只需要获取最后 20 条消息。

这个话题可能有几十万条记录

public List<JSONObject> consumeMessages(String kafkaTopicName) {
  KafkaConsumer<String, String> kafkaConsumer = null;
  boolean flag = true;
  List<JSONObject> messagesFromKafka = new ArrayList<>();
  int recordCount = 0;
  int i = 0;
  int maxMessagesToReturn = 20;

  Properties props = new Properties();         
  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "project.group.id");
  props.put("max.partition.fetch.bytes", "1048576000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  kafkaConsumer = new KafkaConsumer<>(props);

  kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
  TopicPartition topicPartition = new TopicPartition(kafkaTopicName, 0);
  LOGGER.info("Subscribed to topic " + kafkaConsumer.listTopics());
  while (flag) { …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api kafka-records

2
推荐指数
1
解决办法
1万
查看次数