卡夫卡消费者没有正确提交偏移量

Ish*_*wal 3 spring apache-kafka spring-boot kafka-consumer-api spring-kafka

我有一个使用以下属性定义的 Kafka 消费者:

session.timeout.ms = 60000
heartbeat.interval.ms = 6000
Run Code Online (Sandbox Code Playgroud)

我们注意到大约 2000 条消息的滞后,并看到消费者多次使用相同的消息(通过我们的应用程序日志)。另外,注意到一些消息需要大约 10 秒才能完全处理。我们怀疑消费者没有正确提交偏移量(或重复提交相同的旧偏移量),因此消费者接收到了相同的消息。

为了解决这个问题,我们引入了更多的属性:

auto.commit.interval.ms=20000 //To ensure that commit is happening only after processing of message is completed
max.poll.records=10 //To make the consumer pick only 10 messages in one go

And, we set the concurrency to 1.
Run Code Online (Sandbox Code Playgroud)

这解决了我们的问题。滞后开始减少并最终变为 0。

但是,我仍然不清楚为什么会首先出现问题。据我了解,默认情况下:

enable.auto.commit = true
auto.commit.interval.ms=5000
Run Code Online (Sandbox Code Playgroud)

因此,理想情况下,消费者应该每 5 秒提交一次。如果在此时间范围内未完全处理消息,会发生什么情况?消费者正在提交什么偏移量?问题是否是由于轮询记录大小过大(默认为 500)导致的

另外,关于 poll() 方法,我读到:

poll() 调用在后台以 set auto.commit.interval.ms 发出。

那么,最初如果 poll() 更早地每 5 秒发生一次(默认为 auto.commit.interval),为什么它不提交最新的偏移量?因为消费者还没有完成处理吗?然后,它应该在接下来的第 5 秒提交该偏移量。

有人可以回答这些查询并解释为什么会出现原始问题吗?

Gar*_*ell 5

如果您使用 Spring for Apache Kafka,我们建议设置enable.auto.commit为,false以便容器以更具确定性的方式提交偏移量(在每条记录之后或每批记录之后 - 默认)。

最有可能的问题是max.poll.interval.ms默认情况下是 5 分钟。如果您的一批消息花费的时间比这更长,您就会看到这种行为。您可以增加max.poll.interval.ms或像您所做的那样减少max.poll.records

关键是您必须在小于max.poll.interval.ms.

另外,关于 poll() 方法,我读到:

poll() 调用在后台以 set auto.commit.interval.ms 发出。

那是不正确的;poll() 不会在后台调用;自 KIP-62 以来,心跳在后台发送。