如果我在主题级别和生产者级别设置“compression.type”,则优先

Raj*_*Raj 7 apache-kafka

我试图了解“compression.type”配置,我的问题是,如果我在主题级别和生产者级别设置“compression.type”,哪个优先?

Ash*_*san 16

当代理收到来自生产者的一批压缩消息时:

  • 它总是解压缩数据以验证它
  • 它考虑目标主题的压缩编解码器
    • 如果目标主题的压缩编解码器是producer,或者如果批处理和目标主题的编解码器相同,则代理从客户端获取压缩的批处理并将其直接写入主题的日志文件,而无需重新压缩数据。
    • 否则,代理需要重新压缩数据以匹配目标主题的编解码器。

如果生产者运行 0.10 之前的版本,因为需要覆盖偏移量,或者需要任何其他消息格式转换,也可能会发生解压缩和重新压缩。


nan*_*ini 5

我尝试了一些实验来回答这个问题:

**Note:** server.properties has the config compression.type=producer 
Run Code Online (Sandbox Code Playgroud)

./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1--config compression.type=producer --topic t

./kafka-console-producer.sh --broker-list node:6667  --topic t
./kafka-console-producer.sh --broker-list node:6667  --topic t --compression-codec gzip
./kafka-console-producer.sh --broker-list node:6667  --topic t

sh kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files /kafka-logs/t-0/00000000000000000000.log

Dumping /kafka-logs/t-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 compresscodec: NONE 
offset: 1 position: 69 compresscodec: GZIP 
offset: 2 position: 158 compresscodec: NONE 
Run Code Online (Sandbox Code Playgroud)

./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1--config compression.type=gzip --topic t1

./kafka-console-producer.sh --broker-list node:6667  --topic t1
./kafka-console-producer.sh --broker-list node:6667  --topic t1 --compression-codec gzip
./kafka-console-producer.sh --broker-list node:6667  --topic t1 --compression-codec snappy

 sh kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files /kafka-logs/t1-0/00000000000000000000.log
Dumping /kafka-logs/t1-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 compresscodec: GZIP 
offset: 1 position: 89 compresscodec: GZIP 
offset: 2 position: 178 compresscodec: GZIP 
Run Code Online (Sandbox Code Playgroud)

显然,该主题占据了主导地位。

来自Kafka的wrt压缩和解压文本——权威指南

然而,Kafka 代理必须解压缩所有消息批次,以验证各个消息的校验和并分配偏移量。然后它需要重新压缩消息批处理以将其存储在磁盘上。

从 0.10 版开始,有一种新的消息格式,允许在消息批处理中使用相对偏移量。这意味着新的生产者将在发送消息批次之前设置相对偏移量,这允许代理跳过消息批次的重新压缩。

因此,当压缩类型不同时,将遵循主题压缩。如果相同,则保留生产者设置的原始压缩编解码器。

参考 - https://kafka.apache.org/documentation/