kav*_*vin 4 java apache-kafka kafka-consumer-api
我目前正在从具有特定偏移量的主题中获取消息。我正在使用seek()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(commitSync()/commitAsync())时,Seek() 不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后一个承诺的偏移量。
那么当使用Seek() 时是否强制将偏移量存储在外部数据库中而不提交给 Kafka ?Seek 和 Commit 不会并行工作吗?
客户端版本 - kafka-clients - 2.4.0
谢谢!!
当您提交(自动或手动几乎没有区别)时,您将在代理端存储消费者已到达分区中多远的记录。这个提交的偏移量只在重新平衡的情况下使用,这样当消费者被分配到那个分区时,他们可以从已知所有先前消息都已处理的点开始。这提供了一个保证,只要消费者被正确编码,当消息被顺序处理时,在组成员身份发生变化的情况下,消息不会在消费时丢失。
当组成员稳定时,提交的偏移量什么也不做。每个消费者都有自己的内存偏移量,它维护并在每次从代理获取一批记录时使用。默认情况下,此偏移量按顺序增加。seek 方法仅更改此内存中的偏移量,以便下一次轮询将从您指定的任意偏移量中获取,除非它不存在,在这种情况下将抛出异常。
如果您在外部存储提交偏移量,则可以在重新平衡后使用 seek 来检索外部存储的偏移量并从那里获取,但在这种情况下,您必须在 RebalanceListener 中调用 seek - 如果您在 poll 之前调用 seek 它将没有效果由于消费者仅在轮询方法期间发现重新平衡和新分区分配,因此在轮询期间无需干预,它将从上次提交的偏移量开始消费。
当您暂停消费者时,也会出现这种稍微不直观的情况,我在https://chrisg23.blogspot.com/2020/02/why-is-pausing-kafka-consumer-so.html?m=1 上写了一些内容