Kafka - 如何使用高级消费者在每条消息之后提交偏移量?

Hon*_* Li 24 message-queue distributed-transactions apache-kafka apache-zookeeper

我正在使用Kafka的高级消费者.因为我正在使用Kafka作为我的应用程序的"事务队列",所以我需要确保我不会错过或重读任何消息.我有两个问题:

  1. 如何将偏移量提交给zookeeper?在每次成功使用消息后,我将关闭自动提交和提交偏移量.我似乎无法找到如何使用高级消费者执行此操作的实际代码示例.谁能帮我这个?

  2. 另一方面,我听说承诺动物园管理员可能会很慢,所以另一种方式可能是在本地跟踪偏移?这种替代方法是否可取?如果是的话,你会怎么做?

小智 25

您可以先禁用自动提交: auto.commit.enable=false

然后在获取消息后提交: consumer.commitOffsets(true)


lau*_*man 22

http://kafka.apache.org/documentation.html#consumerconfigs有两个相关设置.

auto.commit.enable
Run Code Online (Sandbox Code Playgroud)

auto.commit.interval.ms
Run Code Online (Sandbox Code Playgroud)

如果你想设置它使得消费者在每条消息之后提交偏移量,那将是困难的,因为唯一的设置是在定时器间隔之后,而不是在每条消息之后.您将不得不对传入的消息进行一些速率预测,并相应地设置时间.

一般情况下,不建议将此间隔保持得太小,因为它会大大增加zookeeper中的读/写速率,并且zookeeper会因为它在仲裁中的速度非常一致而变慢.

  • 您如何看待使用commitOffsets()方法? (4认同)