Kafka制作人TimeoutException:过期1条记录

Pra*_*dey 15 apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

我正在使用Kafka和Spring-boot:

卡夫卡制片人班:

@Service
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception {
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            }
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

卡夫卡的配置:

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093
Run Code Online (Sandbox Code Playgroud)

假设我已经在主题中发送了10次10​​00条消息my-test-topic.

10次​​中有8次我成功获取了我的消费者中的所有消息但有时我收到以下错误:

2017-10-05 07:24:11, [ERROR] [my-service - LoggingProducerListener - onError:76] Exception thrown when sending a message with key='null' and payload='{"deviceType":"X","deviceKeys":[{"apiKey":"X-X-o"}],"devices...' to topic my-test-topic

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-test-topic-4 due to 30024 ms has passed since batch creation plus linger time

mic*_*brz 9

有三种可能性:

  1. 增加request.timeout.ms- 这是Kafka等待整批准备在缓冲区中的时间.因此,在您的情况下,如果缓冲区中的消息少于100 000条,则会发生超时.更多信息:https://stackoverflow.com/a/34794261/2707179
  2. 减少batch-size- 与前一点相关,它将更频繁地发送批次,但它们将包括更少的消息.
  3. 根据邮件大小,您的网络可能无法赶上高负载?检查您的吞吐量是否不是瓶颈.

  • 自从我在Kafka上启用SSL以来,我遇到了与OP相同的问题,并注意到,像我一样,他设置了`linger.ms`.根据文档,即使批次未满,也会在此停留时间之后发送批次,因此即使批次较大,也不应超时. (6认同)
  • `request.timeout.ms` 告诉批次在超时之前可以在缓冲区中保留多长时间。这不是 Kafka 等待缓冲区被填满的时间。 (3认同)
  • @michalbrz阅读并理解这两篇文章后:1)/sf/answers/2435598301/和2)https://www.cloudera.com/documentation/kafka/latest/topics/kafka_performance.html。我觉得我们应该增加批量大小以避免超时。如果我们增加batch-size -&gt; 批次数会减少 -&gt; 请求数会减少 -&gt; 发送记录的时间会减少 -&gt; 超时不会频繁发生 (2认同)