启用压缩后的 Kafka 消息大小

Chr*_*tel 5 apache-kafka kafka-producer-api

我对 Kafka 2.6.0 中的消息大小配置有点困惑。但我们还是先讲个故事吧:

我们使用由 3 个节点组成的 Kafka 集群。到目前为止,消息的标准配置已经完成。“zstd 压缩”已激活。

相关broker配置很简单:

compression.type=zstd
Run Code Online (Sandbox Code Playgroud)

此时生产者配置也很简单:

compression.type=zstd
Run Code Online (Sandbox Code Playgroud)

现在我们想将 8 MB 的消息放入特定主题中。该数据的压缩大小仅为 200 kbyte。

如果我将此数据放入主题中,则会发生以下错误:

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt

[2020-11-05 13:43:34,500] ERROR Error when sending message to topic XXX with key: null, value: 8722456 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 8722544 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
Run Code Online (Sandbox Code Playgroud)

所以我改变了生产者配置,如下所示:

compression.type=zstd
max.request.size=10485760
Run Code Online (Sandbox Code Playgroud)

现在生产者接受更大的消息。但它仍然不起作用:

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt

[2020-11-05 15:10:01,513] ERROR Error when sending message to topic Komsa.Kafka.Test with key: null, value: 8722544 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
Run Code Online (Sandbox Code Playgroud)

这是另一个错误消息。我不明白为什么会发生这种情况。

我认为这条消息与“message.max.bytes”属性有关。但我不明白为什么。这是该属性的文档:

Kafka 允许的最大记录批量大小(如果启用压缩,则在压缩后)。如果增加此值并且存在早于 0.10.2 的消费者,则消费者的获取大小也必须增加,以便他们可以获取这么大的记录批次。在最新的消息格式版本中,为了提高效率,记录总是分组为批次。在以前的消息格式版本中,未压缩的记录不会分组为批次,并且此限制仅适用于这种情况下的单个记录。这可以使用主题级别 max.message.bytes 配置针对每个主题进行设置。

我认为这意味着该参数与压缩消息大小有关,压缩消息大小为一些千字节。

有人能帮我吗?

Chr*_*tel 5

我找到了解决方案:

问题是 kafka-console- Producer.sh 忽略生产者配置中的compression.type。如果我显式调用

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --compression-codec=zstd --broker-list broker < kafka/new\ 2.txt
Run Code Online (Sandbox Code Playgroud)

使用Compression.codec=zstd它可以工作,因为生产者压缩了消息。