我正在编写一个消费者,一旦将一系列记录提交给Mongo,就会手动提交偏移量.
在发生Mongo错误或任何其他错误的情况下,尝试将记录持久化到错误处理集合以便在以后重播.如果Mongo失败,那么我希望消费者在尝试从Kakfa的非公开偏移中读取记录之前停止处理一段时间.
以下示例有效,但我想知道这种情况的最佳做法是什么?
while (true) {
boolean commit = false;
try {
ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
kafkaMessageProcessor.processRecords(records);
commit = true;
}
catch (Exception e) {
logger.error("Unable to consume closing consumer and restarting", e);
try {
consumer.close();
}
catch (Exception consumerCloseError) {
logger.error("Unable to close consumer", consumerCloseError);
}
logger.error(String.format("Attempting recovery in [%d] milliseconds.", recoveryInterval), e);
Thread.sleep(recoveryInterval);
consumer = createConsumer(properties);
}
if (commit) {
consumer.commitSync();
}
}
private KafkaConsumer<K, V> createConsumer(Properties properties) {
KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
consumer.subscribe(topics); …Run Code Online (Sandbox Code Playgroud)