防止卡夫卡消费者长时间超时

IS_*_*_EV 3 apache-kafka kafka-consumer-api

我需要防止kafka用户在应用程序等待特定过程完成时超时。我的方法是暂停分区,然后在过程完成后恢复它们。

List<TopicPartition> partitionList = new ArrayList<>(); 
partitionList.addAll(kafkaConsumer.assignment());
kafkaConsumer.pause(partitionList);

while(//waiting for the process to complete){
    Thread.sleep(10000);                            
    kafkaConsumer.poll(0);
}
kafkaConsumer.resume(partitionList);
Run Code Online (Sandbox Code Playgroud)

问题

暂停会自动将心跳发送到kafka吗?还是我仍需要定期轮询才能发送心跳?

我的最好方法是吗?还是有更好的方法呢?

Mat*_*Sax 7

从Kafka 0.10.1开始,消费者确实具有用于发送心跳的后台线程:https : //cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+ a +背景+线程

因此,您无需致电poll()即可向代理发送心跳。但是,还有第二次超时max.poll.interval.ms-您必须poll()在此时间内致电,以避免第二次超时。默认值为5分钟。如果您的等待时间甚至更长,则可以增加此超时时间。如果这样做,您也不需要暂停任何分区等。

如果您使用的是旧版本,则可以暂停,而poll()定期调用是发送常规心跳以避免超时的唯一方法。