我有一个 spring-boot (2.1.3) 服务将消息发布到 kafka(2.12-2.3.0) 主题。服务创建主题,稍后,在服务启动后,将保留时间设置为 1 秒。
@SpringBootApplication()
@EnableAsync
public class MetricsMsApplication {
public static void main(String[] args) {
SpringApplication.run(MetricsMsApplication.class, args);
}
@Bean
public NewTopic topic1() {
NewTopic nt = new NewTopic("metrics", 10, (short) 1);
return nt;
}
@EventListener(ApplicationReadyEvent.class)
private void init() throws ExecutionException, InterruptedException {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
AdminClient client = AdminClient.create(config);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "metrics");
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1000");
Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
AlterConfigsResult alterConfigsResult = client.alterConfigs(updateConfig);
alterConfigsResult.all();
}
Run Code Online (Sandbox Code Playgroud)
}
我发送了几条消息并数到 5,然后启动一个控制台使用者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic admst-metrics --from-beginning
并且仍然收到本应过期的消息。
kafka 日志显示应用了retention.ms 配置。我添加了 cleanup.policy 并将其设置为删除,但这不是必需的,因为它是默认设置。
什么会使这些消息被删除?
简短的回答 - kafka 不是为了满足如此低的保留值而设计的。
更长的答案:
Kafka 将任何(主题)分区的数据存储在段文件中。在任何时候,当所有旧段都“关闭”时,单个段是“活动的”并被写入。保留/压缩仅适用于非活动段。
Kafka rolls new segments when either log.roll.ms or log.segment.bytes is hit. The defaults (see https://kafka.apache.org/documentation/#brokerconfigs) are 7 days and/or ~1GB.
There's also log.segment.delete.delay.ms which by default means any segment is retained for at least a minute.
The work of compacting/deleting non-active segments is done by log cleaner threads. Those sleep for log.cleaner.backoff.ms (15 seconds) when no work is found and only check if any particular segment can be cleaned every log.retention.check.interval.ms (5 minutes)
The result of all this is that retention values anywhere near what you're looking for are not possible by default.
You could try tweaking all the above values, and see how low you can go, but I'm betting this won't scale well for a large number of topics.
| 归档时间: |
|
| 查看次数: |
1552 次 |
| 最近记录: |