ara*_*ran 13 apache-kafka kafka-consumer-api
关于KafkaConsumer(> = 0.9),我正面临一些严重的问题,试图为我的需求实施解决方案.
让我们假设我有一个函数必须只读取来自kafka主题的n条消息.
例如:getMsgs(5)- > 在主题中获取下一个5 kafka消息.
所以,我有一个看起来像这样的循环:
for (boolean exit= false;!exit;)
{
Records = consumer.poll(config.pollTime);
for (Record r:records) {
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
exit=true;
}
}
Run Code Online (Sandbox Code Playgroud)
考虑到这一点,问题是poll()方法可以获得超过5条消息.例如,如果它获得10条消息,我的代码将永远忘记其他5条消息,因为Kafka会认为它们已经消耗掉了.
我尝试提交偏移但似乎不起作用:
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
Run Code Online (Sandbox Code Playgroud)
即使使用偏移配置,每当我再次启动消费者时,它都不会从第6条消息开始(记住,我只想要5条消息),但是从第11条开始(因为第一次轮询消耗了10条消息).
有没有解决方案呢,或者(最肯定的)我错过了什么?
提前致谢!!
use*_*400 13
您可以设置max.poll.records为您喜欢的任何数字,这样您最多可以在每次轮询中获得那么多记录.
对于您在此问题中声明的用例,您不必自己明确提交偏移量.你可以设置enable.auto.commit到true和设置auto.offset.reset,以earliest使得它会踢时,有没有消费group.id(当你即将开始从分区阅读的第一次换句话说).一旦你有一个group.id和一些消费者偏移量存储在Kafka中,并且你的Kafka消费者进程死亡,它将从最后一次提交的偏移量继续,因为它是默认行为,因为当消费者启动时它将首先查找是否有任何承诺抵消,如果是,将从最后承诺的抵消继续,auto.offset.reset 不会启动.
| 归档时间: |
|
| 查看次数: |
29715 次 |
| 最近记录: |