KafkaConsumer.close() 为什么?

Moh*_*med 3 java memory-leaks apache-kafka kafka-consumer-api java-threads

我在这里询问关闭 kafka 消费者。即使使用线程退出,我是否需要关闭 kafka 使用者?关闭它会不会因为任何变化而泄漏资源?

这是一个代码示例:

public class MyThread extends Thread{

    private KafkaConsumer<String, Message> kafkaConsumer;


    @Override
    public void run() {
        kafkaConsumer = initConsumer();
        while(true){
            kafkaconsumer.poll(1000000)
            //Code goes here.
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

是否kafkaConsumer在关闭MyThread退出使用System.exit

Luc*_*ini 7

引自 Gwen Shapira、Neha Narkhede、Todd Palino(O'Reilly Media)的“卡夫卡——权威指南”:

在退出之前始终关闭()消费者。这将关闭网络连接和套接字它还会立即触发重新平衡,而不是等待组协调器发现消费者停止发送心跳并且可能已经死亡,这将花费更长的时间,因此导致消费者无法在更长的时间内消费来自分区的子集。

Kafka - 权威指南

  • 这本书是为 Kafka 0.9.0.1 版本编写的。从那时起,0.10.0.2 版本的关闭机制升级如下“Java 消费者现在可以正常关闭。默认情况下,消费者等待最多 30 秒来完成挂起的请求。A KafkaConsumer 中添加了带超时的新关闭 API 以控制最长等待时间。” https://kafka.apache.org/10/documentation.html (3认同)
  • 它还可以提交偏移量:http://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-java.time.Duration- (2认同)