小编Har*_*mar的帖子

当稍后的消息首先被消费时,Kafka消费者偏移提交

我有一个 java Kafka 消费者,我在其中批量获取 ConsumerRecords 进行处理。示例代码如下——

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for  (ConsumerRecord<String, String> record : records) {
        DoSomeProcessing (record.value());
    }
    consumer.commitAsync();
}

private void DoSomeProcessing(String record) {
    //make an external call to a system which can take random time for different requests or timeout in 5 seconds.
}
Run Code Online (Sandbox Code Playgroud)

我遇到的问题是,如果生成了后面的记录但前面的记录仍未超时,则如何提交或提交哪个偏移量。
假设我批量获取 2 条记录,第一条消息的外部调用仍在等待,第二条消息的外部调用已完成。如果我等待 5 秒的外部响应,Kafka 消息的消耗在某些情况下可能会变得非常慢。如果我在进行另一次轮询之前不等待第一个请求完成,我应向 Kafka 提交多少偏移量?如果我提交 2,并且消费者崩溃,则第一条消息将丢失,因为下次最新提交的偏移量将为 2。

apache-kafka kafka-consumer-api

4
推荐指数
1
解决办法
7180
查看次数

标签 统计

apache-kafka ×1

kafka-consumer-api ×1