Kafka:如何使用 Java API 从主题中删除记录?

4 java apache-kafka

我正在寻找一种从 Kafka 主题中删除(完全删除)消耗记录的方法。我知道有几种方法可以做到这一点,例如,通过更改主题的保留时间或删除 Kafka-logs 文件夹。但我正在寻找的是一种使用 Java API 删除某个主题的一定数量记录的方法,如果可能的话。

我试过测试 AdminClient API,特别是adminclient.deleteRecords(recordsToDelete)方法。但是,如果我没记错的话,该方法仅更改主题中的偏移量,实际上并未从硬盘驱动器中删除所述记录。

是否有真正从硬盘驱动器中删除记录的 Java API?

tri*_*rix 5

这让我一开始也有点困惑,为什么包含的 bin/kafka-delete-records.sh 能够删除但我不能使用 Java API

缺少的部分是您需要调用 KafkaFuture.get() 因为 deleteRecords 返回 Futures 的映射

这是代码

在这段代码中,您需要调用 entry.getValue().get().lowWatermark()

DeleteRecordsResult result = adminClient.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
try {
    for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
        System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();
Run Code Online (Sandbox Code Playgroud)


ome*_*ack 5

我可以删除。如果 linux 在一台机器上,它会从硬盘中删除它。当我从互联网上搜索时,我发现windows中有一个错误。但是,我在 Windows 中找不到此错误的解决方案。如果 kafka 在 linux 机器上运行,则此代码有效。

public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
       TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
       Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
       deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
       kafkaAdminClient.deleteRecords(deleteMap);
}
Run Code Online (Sandbox Code Playgroud)


Roe*_*rel -4

Kafka不支持从主题中删除记录。因此 Kafka 中的客户端基本上处于“只读”模式,无法更改缓冲区。考虑这样的情况:多个不同的客户端(不同的客户端组)读取同一主题并且每个客户端都保存自己的偏移量。如果有人开始从共享缓冲区中删除消息会发生什么?这是不可能的。

消息将根据保留策略从主题中删除。这将确保消息也按顺序删除。