今天,在我的Spring Boot和单实例Kafka应用程序中,我遇到了以下问题:
org.apache.kafka.clients.consumer.CommitFailedException:由于该组已经重新平衡并将分区分配给另一个成员,因此无法完成提交。这意味着后续调用poll()之间的时间比配置的max.poll.interval.ms更长,这通常意味着轮询循环在消息处理上花费了太多时间。您可以通过增加会话超时或通过使用max.poll.records减小poll()中返回的批处理的最大大小来解决此问题。
这可能是什么原因以及如何解决?据我了解-我的消费者被封锁了很长时间,对心跳没有反应。我应该调整Kafka属性以解决该问题。您能否告诉我应该调整哪些确切属性,例如在Kafka端或我的应用程序Spring Kafka端?
我有一个监听 kafka 的 Spring-boot 应用程序。为了避免重复处理,我尝试进行手动提交。为此,我在阅读主题后立即引用了异步提交消息。但我陷入了如何实现消费者幂等性的困境,这样记录就不会被处理两次。