Bog*_*dan 5 java memory-leaks apache-kafka
我正在构建一个 Java 8 应用程序,该应用程序查询 Kafka 主题以获取一条消息。每个请求都会创建一个新Consumer对象(独立于任何现有Consumer对象),它轮询我的 Kafka 主题,获取一条记录,然后Consumer关闭。这种情况每天发生约 20 万次,并且每个请求都独立于所有其他请求,因此我认为我无法重用消费者。基本上,用户从主题请求消息并为他们创建消费者,然后关闭。这种情况平均每秒发生约 2 次,但它是任意的,因此可能发生 10 次/秒或 1 次/小时,无法知道。
一段时间后,Kafka 服务器(不是运行代码的服务器,而是运行 Kafka 的实际服务器)上的堆大小变得很大,垃圾收集无法清除它。最终,更多的 CPU 时间专门用于 GC,并且一切都崩溃了,直到我重新启动 Kafka。
这是导致问题的代码的近似版本,具有while(true)近似的真实行为(在生产中,消费者不是在 while 循环中创建的,而是在用户从主题请求消息时按需创建的):
Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);
while(true){
Consumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition("TOPIC", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToEnd(Arrays.asList(tp));
// I've narrowed down the memory leak to this line
ConsumerRecords<String, String> cr = consumer.poll(1000);
// If I remove this line ^, the memory leak does not happen
/* CODE TO GET ONE RECORD */
consumer.unsubscribe();
consumer.close();
}
Run Code Online (Sandbox Code Playgroud)
在 20 个 JVM 上运行此代码会在大约 20 分钟内导致内存泄漏。这是 Kafka 服务器上的堆(蓝色)和 GC 暂停时间(绿色)的样子:

我做错了什么(或者有更好的方法来解决这个问题),还是当创建和关闭很多消费者时这是 Kafka 中的一个错误?
我在客户端运行 Kafka 0.10.2.1,在服务器端运行 Kafka 0.10.2.0。
无论您收到的请求数量和频率如何,您仍然可以重用 KafkaConsumer 实例。您只能在请求到达时进行轮询,但不需要每次都创建并关闭消费者。
话虽如此,如果内存使用量增加并且未被 GC 回收,您对消费者的使用可能会暴露代理上的内存管理问题。我发现当生产者被频繁回收时,代理会耗尽直接内存。因此,这方面可能还有改进的余地。可能最好在 issues.apache.org 上提交一张票来查看它。
| 归档时间: |
|
| 查看次数: |
4726 次 |
| 最近记录: |