Bra*_*avo 1 jms apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
下面是我的生产者配置,如果您看到它们的压缩类型为 gzip ,即使我提到了压缩类型,为什么消息没有发布并且它失败了
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, edi856KafkaConfig.getBootstrapServersConfig());
props.put(ProducerConfig.RETRIES_CONFIG, edi856KafkaConfig.getRetriesConfig());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, edi856KafkaConfig.getBatchSizeConfig());
props.put(ProducerConfig.LINGER_MS_CONFIG, edi856KafkaConfig.getIntegerMsConfig());
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, edi856KafkaConfig.getBufferMemoryConfig());
***props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");***
props.put(Edi856KafkaProducerConstants.SSL_PROTOCOL, edi856KafkaConfig.getSslProtocol());
props.put(Edi856KafkaProducerConstants.SECURITY_PROTOCOL, edi856KafkaConfig.getSecurityProtocol());
props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_LOCATION, edi856KafkaConfig.getSslKeystoreLocation());
props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_PASSWORD, edi856KafkaConfig.getSslKeystorePassword());
props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_LOCATION, edi856KafkaConfig.getSslTruststoreLocation());
props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_PASSWORD, edi856KafkaConfig.getSslTruststorePassword());
**props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");**
Run Code Online (Sandbox Code Playgroud)
错误如下
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-12-07_12:34:10.037 [http-nio-8080-exec-1] ERROR c.tgt.trans.producer.Edi856Producer - Exception while writing mesage to topic= '{}'
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
Run Code Online (Sandbox Code Playgroud)
并且想要消费者配置我们需要使用我想要消费者端的kafka消息的字符串表示
不幸的是,您在 Kafka 的新 Producer 实现中遇到了一个相当奇怪的问题。
尽管 Kafka 在代理级别应用的消息大小限制应用于单个压缩记录集(可能是多个消息),但新的生产者目前max.request.size在任何压缩之前对记录应用限制。
这已在https://issues.apache.org/jira/browse/KAFKA-4169(创建于 14/Sep/16 并在撰写本文时未解决)中捕获。
如果您确定您的消息的压缩大小(加上记录集的任何开销)将小于代理的配置max.message.bytes,您可以通过增加max.request.size生产者上的属性值而逃脱,而无需更改任何配置在经纪人上。这将允许生产者代码接受预压缩有效负载的大小,然后将其压缩并发送到代理。
但是,重要的是要注意,如果 Producer 尝试发送对于代理的配置来说太大的请求,代理将拒绝该消息,并且您的应用程序是否正确处理此问题将取决于您的应用程序。
| 归档时间: |
|
| 查看次数: |
1239 次 |
| 最近记录: |