卡夫卡没有用墓碑删除密钥

Sam*_*Sam 7 apache-kafka

我用以下属性创建了一个kafka主题

min.cleanable.dirty.ratio = 0.01,delete.retention.ms = 100,segment.ms = 100,cleanup.policy =紧凑

假设我按顺序插入kv对1111:1,1111:2,1111:null,2222:1 现在发生的事情是除了上一条消息之外,日志压缩在其余消息上运行并清除前两条但保留1111:null

根据文件,

Kafka log compaction also allows for deletes. A message with a key and a null payload acts like a tombstone, a delete marker for that key. Tombstones get cleared after a period.
Run Code Online (Sandbox Code Playgroud)

所以,我希望当达到delete.retention.ms时,空标记应该用键1111删除该消息

我有两个问题 - 为什么墓碑标记不起作用?为什么压缩会忽略最后一条消息?

这就是server.properties文件的内容 -

log.retention.ms=100
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=100
log.cleaner.delete.retention.ms=100
log.cleaner.enable=true
log.cleaner.min.cleanable.ratio=0.01
Run Code Online (Sandbox Code Playgroud)

Mat*_*Sax 12

墓碑记录的设计保留时间更长.原因是,经纪人不追踪消费者.假设消费者在阅读第一条记录后离线一段时间.在消费者瘫痪的同时,日志压缩开始了.如果日志压缩会删除逻辑删除记录,则消费者永远不会了解记录被删除的事实.如果使用者实现了缓存,则可能会发生永远不会删除记录的情况.因此,墓碑保存的时间更长,以允许离线消费者接收所有墓碑以进行本地清理.

只有在delete.retention.ms(默认值为1天)之后才会删除墓碑.注意:这是主题级别配置,并且没有代理级别配置.因此,如果要更改它,则需要为每个主题设置配置.


Hem*_*nth 11

压缩的主题有两个部分:

1) 清洁部分:kafka 清洁剂至少清洁过一次的 kafka 原木部分。

2) 脏部分:kafka 原木的部分直到现在甚至一次都没有被 kafka 清洁剂清理过。Kafka 维护脏偏移。偏移量 >= 脏偏移量的所有消息都属于脏部分。

注意:Kafka 清洁器会清除所有段(无论段是否处于已清洁/脏的部分)并在每次脏比率达到 min.cleanable.dirty.ratio 时重新复制它们。

墓碑被删除段明智。如果段满足以下条件,则删除段中的墓碑:

  1. 段应该在日志的清理部分。

  2. 段的最后修改时间应<=(包含偏移量=(脏偏移 - 1)的消息的段的最后修改时间)- delete.retention.ms。

很难详细说明第二点,但简单来说,第二点意味着 => 段大小应等于 log.segment.bytes/segment.bytes(默认为 1GB)。为了使段大小(在更干净的部分)等于 1GB,您需要生成大量具有不同键的消息。但是您只生成了 4 条消息,其中 3 条消息具有相同的密钥。这就是为什么在包含 1111:null 消息的段中不删除墓碑的原因(段不满足我上面提到的第二点)。

您有两个选项可以删除带有 4 条消息的墓碑:

  1. 使 delete.retention.ms=0 或
  2. 使 log.segment.bytes/segment.bytes=50。

源代码(额外阅读): https : //github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala

try {
      // clean segments into the new destination segment
      for (old <- segments) {
        val retainDeletes = old.lastModified > deleteHorizonMs
        info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
            .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
        cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
      }
Run Code Online (Sandbox Code Playgroud)