日志压缩以确保每个键只保留一条消息

Gio*_*ous 6 apache-kafka

我想创建一个包含唯一键及其对应的最新值的主题。因此,当将带有现有键的消息插入主题时,旧消息将被删除。

为此,我在server.properties文件中配置了以下参数:

log.cleaner.enable=true
log.cleanup.policy=compact

# The minimum age of a log file to be eligible for deletion due to age
log.retention.minutes=3

log.retention.bytes=10737418

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000

# The maximum time before a new log segment is rolled out (in milliseconds).
# If not set, the value in log.roll.hours is used
log.roll.ms=600000
Run Code Online (Sandbox Code Playgroud)

因此应每 3 分钟进行一次压实。为了测试压缩策略,我创建了一个主题retention_test

kafka-topics --zookeeper localhost:2181 --create --topic retention_test --replication-factor 1 --partitions 1
Run Code Online (Sandbox Code Playgroud)

并使用控制台使用者,kafka-console-producer --broker-list localhost:9092 --topic retention_test --property parse.key=true --property key.separator=:我产生了以下消息:

>1:first
>2:second
>3:third
Run Code Online (Sandbox Code Playgroud)

控制台消费者kafka-console-consumer --bootstrap-server localhost:9092 --topic retention_test --from-beginning成功消费它们的地方;

first
second
third
Run Code Online (Sandbox Code Playgroud)

现在,当我尝试使用已添加的键插入消息时,旧消息似乎没有被忽略并保留在主题中:

在生产者方面:

>1:updatedFirst
Run Code Online (Sandbox Code Playgroud)

请注意,为了测试行为,在 3 分钟的保留期过去很久之后,我已多次重新启动使用者。输出是

first
second
third
updatedFirst
Run Code Online (Sandbox Code Playgroud)

所需的输出应该是

second
third
updatedFirst
Run Code Online (Sandbox Code Playgroud)

因为firstupdatedFirst具有相同的密钥。

根据文档

日志压缩为我们提供了更细粒度的保留机制,以便我们保证至少保留每个主键的最后一次更新

是否有可能保持正好一个消息(最近的一次),每个键代替至少一个消息(包括最近的一次)?

Mic*_*cki 8

我会说这通常是不可能的。Kafka 将消息存储在每个主题的每个分区的段中。每个段都是一个文件,它们只会被附加到(或作为一个整体删除)。压缩只能通过重写现有的段文件来跳过那些具有相同密钥的后续消息的消息。然而,头段(当前新消息被附加到的那个段)不会被压缩(直到创建一个成为头段的新段)。

你通过log.retentionconfig配置的3分钟在播放时不log.cleanup.policy=compact生效,只在播放时有效log.cleanup.policy=delete

为什么对于给定的键只有一条消息很重要?如果您提供有关您的用例的更多信息,也许可以建议另一种方法。

  • 我懂了!因此,具有相同密钥的两条消息可能都属于头部段,因此将被压缩。感谢您的解释。 (3认同)