即使我在生产者配置中指定了压缩类型,kafka 代理也没有压缩我更大尺寸的消息

Bra*_*avo 1 jms apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

下面是我的生产者配置,如果您看到它们的压缩类型为 gzip ,即使我提到了压缩类型,为什么消息没有发布并且它失败了

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, edi856KafkaConfig.getBootstrapServersConfig());
        props.put(ProducerConfig.RETRIES_CONFIG, edi856KafkaConfig.getRetriesConfig());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, edi856KafkaConfig.getBatchSizeConfig());
        props.put(ProducerConfig.LINGER_MS_CONFIG, edi856KafkaConfig.getIntegerMsConfig());
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, edi856KafkaConfig.getBufferMemoryConfig());
        ***props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");***
        props.put(Edi856KafkaProducerConstants.SSL_PROTOCOL, edi856KafkaConfig.getSslProtocol());
        props.put(Edi856KafkaProducerConstants.SECURITY_PROTOCOL, edi856KafkaConfig.getSecurityProtocol());
        props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_LOCATION, edi856KafkaConfig.getSslKeystoreLocation());
        props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_PASSWORD, edi856KafkaConfig.getSslKeystorePassword());
        props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_LOCATION, edi856KafkaConfig.getSslTruststoreLocation());
        props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_PASSWORD, edi856KafkaConfig.getSslTruststorePassword());
        **props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");** 
Run Code Online (Sandbox Code Playgroud)

错误如下

org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-12-07_12:34:10.037 [http-nio-8080-exec-1] ERROR c.tgt.trans.producer.Edi856Producer - Exception while writing mesage to topic= '{}'
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
Run Code Online (Sandbox Code Playgroud)

并且想要消费者配置我们需要使用我想要消费者端的kafka消息的字符串表示

tKe*_*tKe 5

不幸的是,您在 Kafka 的新 Producer 实现中遇到了一个相当奇怪的问题。

尽管 Kafka 在代理级别应用的消息大小限制应用于单个压缩记录集(可能是多个消息),但新的生产者目前max.request.size在任何压缩之前对记录应用限制。

这已在https://issues.apache.org/jira/browse/KAFKA-4169(创建于 14/Sep/16 并在撰写本文时未解决)中捕获。

如果您确定您的消息的压缩大小(加上记录集的任何开销)将小于代理的配置max.message.bytes,您可以通过增加max.request.size生产者上的属性值而逃脱,无需更改任何配置在经纪人上。这将允许生产者代码接受预压缩有效负载的大小,然后将其压缩并发送到代理。

但是,重要的是要注意,如果 Producer 尝试发送对于代理的配置来说太大的请求,代理将拒绝该消息,并且您的应用程序是否正确处理此问题将取决于您的应用程序。