小编Mic*_*man的帖子

Kafka 0.9如何在使用KafkaConsumer手动提交偏移时重新使用消息

我正在编写一个消费者,一旦将一系列记录提交给Mongo,就会手动提交偏移量.
在发生Mongo错误或任何其他错误的情况下,尝试将记录持久化到错误处理集合以便在以后重播.如果Mongo失败,那么我希望消费者在尝试从Kakfa的非公开偏移中读取记录之前停止处理一段时间.
以下示例有效,但我想知道这种情况的最佳做法是什么?

while (true) {
    boolean commit = false;
    try {
        ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
        kafkaMessageProcessor.processRecords(records);
        commit = true;
    }
    catch (Exception e) {
        logger.error("Unable to consume closing consumer and restarting", e);
        try {
           consumer.close();
        }
        catch (Exception consumerCloseError) {
            logger.error("Unable to close consumer", consumerCloseError);
        }
        logger.error(String.format("Attempting recovery in  [%d] milliseconds.", recoveryInterval), e);
        Thread.sleep(recoveryInterval);
        consumer = createConsumer(properties);
    }
    if (commit) {
        consumer.commitSync();
    }

}

private KafkaConsumer<K, V> createConsumer(Properties properties) {
    KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
    consumer.subscribe(topics); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

8
推荐指数
2
解决办法
1万
查看次数

标签 统计

apache-kafka ×1

java ×1

kafka-consumer-api ×1