我有一个 java Kafka 消费者,我在其中批量获取 ConsumerRecords 进行处理。示例代码如下——
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
DoSomeProcessing (record.value());
}
consumer.commitAsync();
}
private void DoSomeProcessing(String record) {
//make an external call to a system which can take random time for different requests or timeout in 5 seconds.
}
Run Code Online (Sandbox Code Playgroud)
我遇到的问题是,如果生成了后面的记录但前面的记录仍未超时,则如何提交或提交哪个偏移量。
假设我批量获取 2 条记录,第一条消息的外部调用仍在等待,第二条消息的外部调用已完成。如果我等待 5 秒的外部响应,Kafka 消息的消耗在某些情况下可能会变得非常慢。如果我在进行另一次轮询之前不等待第一个请求完成,我应向 Kafka 提交多少偏移量?如果我提交 2,并且消费者崩溃,则第一条消息将丢失,因为下次最新提交的偏移量将为 2。