多个消费者触发的Kafka broker内存泄漏

Bog*_*dan 5 java memory-leaks apache-kafka

我正在构建一个 Java 8 应用程序,该应用程序查询 Kafka 主题以获取一条消息。每个请求都会创建一个新Consumer对象(独立于任何现有Consumer对象),它轮询我的 Kafka 主题,获取一条记录,然后Consumer关闭。这种情况每天发生约 20 万次,并且每个请求都独立于所有其他请求,因此我认为我无法重用消费者。基本上,用户从主题请求消息并为他们创建消费者,然后关闭。这种情况平均每秒发生约 2 次,但它是任意的,因此可能发生 10 次/秒或 1 次/小时,无法知道。

一段时间后,Kafka 服务器(不是运行代码的服务器,而是运行 Kafka 的实际服务器)上的堆大小变得很大,垃圾收集无法清除它。最终,更多的 CPU 时间专门用于 GC,并且一切都崩溃了,直到我重新启动 Kafka。

这是导致问题的代码的近似版本,具有while(true)近似的真实行为(在生产中,消费者不是在 while 循环中创建的,而是在用户从主题请求消息时按需创建的):

Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);

while(true){
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    TopicPartition tp = new TopicPartition("TOPIC", 0);
    consumer.assign(Arrays.asList(tp));
    consumer.seekToEnd(Arrays.asList(tp));

    // I've narrowed down the memory leak to this line
    ConsumerRecords<String, String> cr = consumer.poll(1000); 
    // If I remove this line ^, the memory leak does not happen

    /* CODE TO GET ONE RECORD */

    consumer.unsubscribe();
    consumer.close();
}
Run Code Online (Sandbox Code Playgroud)

在 20 个 JVM 上运行此代码会在大约 20 分钟内导致内存泄漏。这是 Kafka 服务器上的堆(蓝色)和 GC 暂停时间(绿色)的样子: 卡夫卡内存泄漏

我做错了什么(或者有更好的方法来解决这个问题),还是当创建和关闭很多消费者时这是 Kafka 中的一个错误?

我在客户端运行 Kafka 0.10.2.1,在服务器端运行 Kafka 0.10.2.0。

Mic*_*cki 2

无论您收到的请求数量和频率如何,您仍然可以重用 KafkaConsumer 实例。您只能在请求到达时进行轮询,但不需要每次都创建并关闭消费者。

话虽如此,如果内存使用量增加并且未被 GC 回收,您对消费者的使用可能会暴露代理上的内存管理问题。我发现当生产者被频繁回收时,代理会耗尽直接内存。因此,这方面可能还有改进的余地。可能最好在 issues.apache.org 上提交一张票来查看它。