Kafka不会删除主题中的旧消息

sim*_*Pod 6 apache-kafka

在kafka中,我将保留政策设置为3天 server.properties

############################# Log Retention Policy #############################
...
log.retention.hours=72
...
Run Code Online (Sandbox Code Playgroud)

主题retention.ms设置为172800000(48h)。

但是,/ tmp / kafka-logs文件夹中仍然有旧数据,并且没有一个被删除。更改这些属性后,我等待了几个小时。

有什么需要设置的吗?当前正在产生和使用所有主题。

sim*_*Pod 6

关键是设置log.cleanup.policycompactdelete。我没有设置此属性。

运行:kafka-topics --zookeeper 127.0.0.1:2181 --topic topic1 --describe显示在主题上设置的属性,例如。Configs:retention.ms=172800000,cleanup.policy=compact

cleanup.policy必须设置。我也手动设置retention.ms/ retention.bytes来控制清理触发器。

编辑12. 11. 2019:根据kafka文档,似乎cleanup.policy应该默认为delete

  • 还请注意,日志段的大小也会影响此行为。 (2认同)

xma*_*mar 5

Compact政策只会紧缩来自键的值。也就是说,它最终将触发压缩过程,该过程将为键只留下一个(最终)值。但永远不要删除最后一个值。

为了按时间触发删除,您需要设置delete策略。在这种情况下,删除过程将删除早于给定数据的数据。

但是,您可以将策略设置为 compact,delete以利用同一主题的两个进程(在早期版本中不可用)。

然而,这些过程并不是次要的:它们最终会在某些条件下被触发,例如:

# The interval at which log segments are checked to see 
# if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
Run Code Online (Sandbox Code Playgroud)

(查看Kafka 文档中的更多条件,然后将保证删除超过阈值的数据。

另外,不同的时间粒度有不同的设置,并且有优先级(如果设置了,会忽略下一个)。确保没有意外的覆盖。有关详细信息,请查看综合文档。

  • 这是经纪人配置。也许您正在其他地方设置它?您可以在这里找到详细信息:https://kafka.apache.org/documentation/#brokerconfigs (2认同)