Kafka 主题压缩

cod*_*ker 5 apache-kafka

我习惯于kafka-topics.bat --zookeeper localhost:2181 --alter --topic test --config cleanup.policy=compact delete config min.cleanable.dirty.ratio=0.01 --config segment.ms=100 --config delete.retention.ms=100压缩我的主题。我已经使用相同的密钥发送了 2000 条消息。当我使用这些消息时,我会单独获取每条消息,而不是一条压缩消息。

gro*_*roo 2

您所指的压缩设置与您如何通过 Kafka 客户端使用消息无关。请在此处查看官方文档以了解更多详细信息。

如果您想控制客户端使用消息的方式,则必须使用客户端配置属性来配置客户端。

考虑这样一种情况,您将主题池化 300 毫秒并接收一组消息(ConsumerRecords),然后您可以对其进行迭代以独立处理每条消息。

while(true) {
   ConsumerRecords<String, JsonNode> records = kafkaConsumer.poll(300);
       if(records.count() > 0) {
          for(ConsumerRecord<String, JsonNode> record: records) {
             if(counter % 500 == 0) {
                 log.info("Record recovered, groupId: {}, topicName: {}, key: {}, value: {} , offset: {}",
                 this.groupId, this.topicNames, record.key(), record.value(), record.offset());
                    }
                }
            }
        }
Run Code Online (Sandbox Code Playgroud)