IS_*_*_EV 3 apache-kafka kafka-consumer-api
我需要防止kafka用户在应用程序等待特定过程完成时超时。我的方法是暂停分区,然后在过程完成后恢复它们。
List<TopicPartition> partitionList = new ArrayList<>();
partitionList.addAll(kafkaConsumer.assignment());
kafkaConsumer.pause(partitionList);
while(//waiting for the process to complete){
Thread.sleep(10000);
kafkaConsumer.poll(0);
}
kafkaConsumer.resume(partitionList);
Run Code Online (Sandbox Code Playgroud)
问题
暂停会自动将心跳发送到kafka吗?还是我仍需要定期轮询才能发送心跳?
我的最好方法是吗?还是有更好的方法呢?
从Kafka 0.10.1开始,消费者确实具有用于发送心跳的后台线程:https : //cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+ a +背景+线程
因此,您无需致电poll()即可向代理发送心跳。但是,还有第二次超时max.poll.interval.ms-您必须poll()在此时间内致电,以避免第二次超时。默认值为5分钟。如果您的等待时间甚至更长,则可以增加此超时时间。如果这样做,您也不需要暂停任何分区等。
如果您使用的是旧版本,则可以暂停,而poll()定期调用是发送常规心跳以避免超时的唯一方法。
| 归档时间: |
|
| 查看次数: |
1545 次 |
| 最近记录: |