当稍后的消息首先被消费时,Kafka消费者偏移提交

Har*_*mar 4 apache-kafka kafka-consumer-api

我有一个 java Kafka 消费者,我在其中批量获取 ConsumerRecords 进行处理。示例代码如下——

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for  (ConsumerRecord<String, String> record : records) {
        DoSomeProcessing (record.value());
    }
    consumer.commitAsync();
}

private void DoSomeProcessing(String record) {
    //make an external call to a system which can take random time for different requests or timeout in 5 seconds.
}
Run Code Online (Sandbox Code Playgroud)

我遇到的问题是,如果生成了后面的记录但前面的记录仍未超时,则如何提交或提交哪个偏移量。
假设我批量获取 2 条记录,第一条消息的外部调用仍在等待,第二条消息的外部调用已完成。如果我等待 5 秒的外部响应,Kafka 消息的消耗在某些情况下可能会变得非常慢。如果我在进行另一次轮询之前不等待第一个请求完成,我应向 Kafka 提交多少偏移量?如果我提交 2,并且消费者崩溃,则第一条消息将丢失,因为下次最新提交的偏移量将为 2。

Dmi*_*sky 6

我认为您正确地分析了问题,答案可能就是您所怀疑的:在处理小于和等于该偏移量的每个偏移量之前,您无法提交偏移量。这就是 Kafka 的工作原理:它非常注重强排序。

\n\n

解决方案是增加分区和使用者的数量,以便获得所需的并行性。从某些角度来看这并不是很好\xe2\x80\x94你需要更多的线程和资源\xe2\x80\x94但至少你可以编写同步代码。

\n