Spring Kafka 不尊重 max.poll.records 的奇怪行为

use*_*exp 7 spring apache-kafka spring-kafka

好吧,我正在尝试以下场景:

  1. 在 application.properties 中将 max.poll.records 设置为 50。
  2. 在 application.properties 中设置 enable-auto-commit=false 和 ack-mode 为手动。
  3. 在我的方法中添加了@KafkaListener,但不提交任何消息,只是读取、记录但不做出ACK。

实际上,在我的 Kafka 主题中,我有 500 条消息要使用,所以我期望出现以下行为:

  1. Spring Kafka poll() 50 条消息(偏移量 0 到 50)。
  2. 正如我所说,我没有提交任何内容,只是记录了 50 条消息。
  3. 在下一次 Spring Kafka poll() 调用中,获取与步骤 1 相同的 50 条消息(偏移量 0 到 50)。根据我的理解,Spring Kafka 应该继续此循环(步骤 1-3),始终读取相同的消息。

但发生的情况如下:

  1. Spring Kafka poll() 50 条消息(偏移量 0 到 50)。
  2. 正如我所说,我没有提交任何内容,只是记录了 50 条消息。
  3. 在下一次 Spring Kafka poll() 调用中,获取下 50 条消息,与步骤 1 不同(偏移 50 到 100)。

Spring Kafka 以 50 条消息为一组读取 500 条消息,但不提交任何内容。如果我关闭应用程序并重新启动,则会再次收到 500 条消息。

所以,我的疑问是:

  1. 如果我将 max.poll.recors 配置为 50,那么如果我没有提交任何内容,Spring Kafka 如何获取接下来的 50 条记录?我知道 poll() 方法应该返回相同的记录。
  2. Spring Kafka 有缓存吗?如果是的话,如果我在缓存中获取 100 万条记录而没有提交,这可能会是一个问题。

Kav*_*lai 1

消费者不提交抵消额只会在以下情况下产生影响:

  • 你的消费者在读了200条消息后崩溃了,当你重新启动它时,它会从0重新开始。
  • 您的消费者不再被分配分区。

因此,在完美的世界中,您根本不需要提交,它将消耗所有消息,因为消费者首先请求 1-50,然后请求 51-100。

但如果消费者崩溃了,没有人知道消费者读取的偏移量是多少。如果消费者已经提交了偏移量,那么当它重新启动时,它可以检查偏移主题以查看崩溃的消费者离开的位置并从那里开始。

max.poll.records定义一次要获取多少条记录,但没有定义要获取哪些记录。