Dan*_*ish 5 apache-kafka kafka-producer-api apache-kafka-streams apache-kafka-connect kafka-topic
日志压缩主题不应该针对相同的键保留重复项。但在我们的例子中,当发送具有相同键的新值时,不会删除前一个值。可能是什么问题?
val TestCompactState: KTable[String, TestCompact] = builder.table[String, TestCompact](kafkaStreamConfigs.getString("testcompact-source"),
(TestCompactmaterialized).withKeySerde(stringSerde).withValueSerde(TestCompactSerde))
Run Code Online (Sandbox Code Playgroud)
我得到的 实际结果
Offsets Keys Messages
5 {"id":5} {"id":5,"namee":"omer","__deleted":"false"}
6 {"id":5} {"id":5,"namee":"d","__deleted":"false"}
Run Code Online (Sandbox Code Playgroud)
我只想要针对相同关键预期结果的最新记录
6 {"id":5} {"id":5,"namee":"d","__deleted":"false"}
Run Code Online (Sandbox Code Playgroud)
据我知道这是不可能的,以保持应用日志压实政策正是每个键一个消息。即使您设置cleanup.policy=compact(主题级别)或log.cleanup.policy=compact(global level),也不能保证只保留最新的消息并压缩较旧的消息。
根据Kafka 官方文档:
日志压缩为我们提供了更细粒度的保留机制,以便我们保证至少保留每个主键的最后一次更新
这种行为可能有多种原因。压缩清理策略不会在每条传入消息后运行。相反,有代理配置
log.cleaner.min.compaction.lag.ms:消息在日志中保持未压缩的最短时间。仅适用于正在压缩的日志。
类型:长;默认值:0;有效值:更新模式:集群范围
这是默认的,0所以这可能不是原因,但值得检查。
需要注意的是,该compact策略从不压缩当前段。消息只有在非活动段上才有资格进行压缩。确保验证
log.segment.bytes:单个日志文件的最大大小
类型:int;默认:1073741824;有效值:[14,...]; 更新模式:集群范围
压缩通常由日志当前(“脏”)段中的数据触发。“脏”一词来自未清洁/未压缩。还有另一种配置有助于引导压实。
log.cleaner.min.cleanable.ratio:对于符合清理条件的日志,脏日志与总日志的最小比率。如果还指定了 log.cleaner.max.compaction.lag.ms 或 log.cleaner.min.compaction.lag.ms 配置,则日志压缩器会在以下任一情况下认为该日志符合压缩条件:(i)已达到脏比率阈值并且日志至少在 log.cleaner.min.compaction.lag.ms 持续时间内有脏(未压缩)记录,或者(ii)如果日志最多有脏(未压缩)记录log.cleaner.max.compaction.lag.ms 周期。
类型:双人;默认值:0.5;有效值:;更新模式:集群范围
默认情况下,要压缩的消息的删除延迟非常高,如以下配置说明所示。
log.cleaner.max.compaction.lag.ms:消息在日志中不符合压缩条件的最长时间。仅适用于正在压缩的日志。
类型:长;默认:9223372036854775807;有效值:更新模式:集群范围
总而言之,您观察所描述的内容可能有多种原因。而且要知道,一个压缩的主题仍是非常重要的不提供任何担保,有重复的消息相同的密钥。它只能保证“至少”保留同一密钥的最新消息。
有一个很好的博客,更详细地解释了日志压缩。
| 归档时间: |
|
| 查看次数: |
758 次 |
| 最近记录: |