获取Kafka压缩消息大小

use*_*252 7 java apache-kafka kafka-producer-api apache-kafka-connect

我想知道 kafka 中消息的压缩大小。

我使用 kafka 1.1.0 和 java kafka-connect 1.1.0 将消息从我的生产者发送到主题。

如果消息对我的制作人来说太大,我会收到

该消息在序列化时为 xxx 字节,大于您使用 max.request.size 配置配置的最大请求大小。

将 max.request.size 设置为合适的值会导致来自代理的错误消息,因为 message.max.bytes 也必须在代理配置中进行相应调整。不幸的是,错误消息不包括代理收到的消息的大小。我调整了 message.max.bytes。到现在为止还挺好。

如果我在生产者端激活压缩,max.request.size 仍然必须与没有压缩的大小相同,因为不幸的是,代码在压缩之前比较了未压缩消息的大小(请参阅https://issues.apache .org/jira/browse/KAFKA-4169 )

但是通过压缩,我将能够减少代理中的 message.max.bytes。问题是我在任何时候都无法确定此压缩消息的大小。有没有办法在发送消息之前或稍后在日志文件中在生产者代码中弄清楚这一点?

在我使用压缩的情况下,message.max.bytes 的默认值 1MB 就足够了,所以我不必更改默认配置。但我想知道我的压缩消息是远低于 1MB 还是只有 0.99MB。在这种情况下,我可能会在生产中增加 message.max.bytes 以避免出现问题。

提前感谢您的支持。

Cap*_*ngs 2

您可以做的是使用压缩库,自己压缩消息,在发送之前检查大小。例如,假设您使用 lz4 压缩,则可以使用 lz4-java lib,然后使用类似以下内容的内容:

private static LZ4Compressor COMPRESS = LZ4Factory.fastestInstance().highCompressor();

String meMessageString      = "My Message that I am sending to kafka";
byte[] uncompressedBytes    = jsonRequest.getBytes();
long lz4compressedLength    = COMPRESSOR.compress(uncompressedBytes).length;
Run Code Online (Sandbox Code Playgroud)