Kafka Log Compacted Topic Duplication Values 未删除针对同一键

Dan*_*ish 5 apache-kafka kafka-producer-api apache-kafka-streams apache-kafka-connect kafka-topic

日志压缩主题不应该针对相同的键保留重复。但在我们的例子中,当发送具有相同键的新值时,不会删除前一个值。可能是什么问题?

val TestCompactState: KTable[String, TestCompact] = builder.table[String, TestCompact](kafkaStreamConfigs.getString("testcompact-source"),
   (TestCompactmaterialized).withKeySerde(stringSerde).withValueSerde(TestCompactSerde)) 

Run Code Online (Sandbox Code Playgroud)

我得到的 实际结果

Offsets      Keys        Messages
5            {"id":5}   {"id":5,"namee":"omer","__deleted":"false"}
6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}
Run Code Online (Sandbox Code Playgroud)

我只想要针对相同关键预期结果的最新记录

6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}
Run Code Online (Sandbox Code Playgroud)

Gio*_*ous 5

据我知道这是不可能的,以保持应用日志压实政策正是每个键一个消息。即使您设置cleanup.policy=compact(主题级别)或log.cleanup.policy=compact(global level),也不能保证只保留最新的消息并压缩较旧的消息。

根据Kafka 官方文档

日志压缩为我们提供了更细粒度的保留机制,以便我们保证至少保留每个主键的最后一次更新


mik*_*ike 5

这种行为可能有多种原因。压缩清理策略不会在每条传入消息后运行。相反,有代理配置

log.cleaner.min.compaction.lag.ms:消息在日志中保持未压缩的最短时间。仅适用于正在压缩的日志。

类型:长;默认值:0;有效值:更新模式:集群范围

这是默认的,0所以这可能不是原因,但值得检查。

需要注意的是,该compact策略从不压缩当前段。消息只有在非活动段上才有资格进行压缩。确保验证

log.segment.bytes:单个日志文件的最大大小

类型:int;默认:1073741824;有效值:[14,...]; 更新模式:集群范围

压缩通常由日志当前(“脏”)段中的数据触发。“脏”一词来自未清洁/未压缩。还有另一种配置有助于引导压实。

log.cleaner.min.cleanable.ratio:对于符合清理条件的日志,脏日志与总日志的最小比率。如果还指定了 log.cleaner.max.compaction.lag.ms 或 log.cleaner.min.compaction.lag.ms 配置,则日志压缩器会在以下任一情况下认为该日志符合压缩条件:(i)已达到脏比率阈值并且日志至少在 log.cleaner.min.compaction.lag.ms 持续时间内有脏(未压缩)记录,或者(ii)如果日志最多有脏(未压缩)记录log.cleaner.max.compaction.lag.ms 周期。

类型:双人;默认值:0.5;有效值:;更新模式:集群范围

默认情况下,要压缩的消息的删除延迟非常高,如以下配置说明所示。

log.cleaner.max.compaction.lag.ms:消息在日志中不符合压缩条件的最长时间。仅适用于正在压缩的日志。

类型:长;默认:9223372036854775807;有效值:更新模式:集群范围

总而言之,您观察所描述的内容可能有多种原因。而且要知道,一个压缩的主题仍是非常重要的提供任何担保,有重复的消息相同的密钥。它只能保证“至少”保留同一密钥的最新消息。

有一个很好的博客,更详细地解释了日志压缩。