发现这两个问题:这里和这里,但我仍然不太明白。我仍然有(意外的?)行为。
我尝试使用此配置来记录紧凑的 kafka 主题
kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic test1 --config "cleanup.policy=compact" --config "delete.retention.ms=1000" --config "segment.ms=1000" --config "min.cleanable.dirty.ratio=0.01" --config "min.compaction.lag.ms=500"
然后我发送这些消息,每条消息至少有1秒的间隔
A: 3
A: 4
A: 5
B: 10
B: 20
B: 30
B: 40
A: 6
Run Code Online (Sandbox Code Playgroud)
我期望的是几秒钟后(配置为 1000?),当我运行时kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --topic test1 --from-beginning
,我应该得到
A: 6
B: 40
Run Code Online (Sandbox Code Playgroud)
相反,我得到:
A: 5
B: 40
A: 6
Run Code Online (Sandbox Code Playgroud)
如果我发布另一条消息B:50
并运行消费者,我得到:
B: 40
A: 6
B: 50
Run Code Online (Sandbox Code Playgroud)
而不是预期的
A: 6
B: 50
Run Code Online (Sandbox Code Playgroud)
基本上,您自己已经提供了答案。正如 Kafka 文档中所述,“日志压缩确保 Kafka 始终至少保留单个主题分区的数据日志中每个消息键的最后一个已知值”。因此,不能保证您始终会收到一键对应的一条消息。
如果我正确理解日志压缩,那么它并不适用于像您在非常有效的问题中提出的用例。相反,它的目的是最终达到主题中每个键仅存在一条消息的阶段。
日志压缩是一种提供更细粒度的每条记录保留的机制,而不是提供更粗粒度的基于时间的保留。这个想法是有选择地删除具有相同主键的最新更新的记录。这样可以保证日志至少具有每个键的最后状态。
如果您计划仅保留每个键的最新状态,并希望处理尽可能少的旧状态(非压缩主题的情况,取决于基于时间/大小的保留,压缩主题是正确的选择) )。据我所知,日志压缩的用例是保存最新的地址、手机号码、数据库中的值等。这些值不会每时每刻都在变化,并且通常有很多键。
从技术角度来看,我猜您的情况发生了以下情况。
当涉及到压缩时,日志被视为分为两部分
生成消息后B: 40
(A: 5
已经生成),clean
日志部分为空,该dirty/active
部分包含A: 5
和B: 40
。该消息A: 6
根本还不是日志的一部分。生成新消息A: 6
将开始对日志的脏部分(因为您的比率非常低)进行压缩,但不包括新消息本身。如前所述,没有什么需要清理的,因此新消息将仅添加到主题中,并且现在位于日志的脏部分中。您在生产时观察到的情况也会发生同样的情况B: 50
。
此外,压缩永远不会发生在您的活动段上。因此,即使您设置segment.ms
为 just它也不会生成新段,因为在生成或1000 ms
后没有新数据传入。A: 6
B: 50
为了解决您的问题并遵守期望,您需要C: 1
在生成A: 6
或后生成另一条消息B: 50
。这样,清洁器可以再次比较日志的干净部分和脏部分,并删除A: 5
或B: 40
。
同时,查看 Kafka 日志目录中的段的行为方式。
从我的角度来看,日志压缩的配置完全没问题!这并不是观察预期行为的正确用例。但对于生产用例,请注意您当前的配置会尝试非常频繁地启动压缩。根据数据量,这可能会变得相当 I/O 密集型。设置默认比率0.50
并且 log.roll.hours 通常设置为 24 小时是有原因的。此外,您通常希望确保消费者有机会在压缩之前读取所有数据。
归档时间: |
|
查看次数: |
2377 次 |
最近记录: |