Spring Kafka:轮询新消息而不是使用 `onMessage` 通知

C-O*_*tto 3 spring apache-kafka kafka-consumer-api spring-kafka

我在我的项目中使用 Spring Kafka,因为在基于 Spring 的项目中使用 Kafka 消息似乎是一个自然的选择。要消费消息,我可以使用该MessageListener接口。Spring Kafka 在内部负责onMessage为每条新消息调用我的方法。

但是,在我的设置中,我更喜欢显式轮询新消息并按顺序处理它们(这将需要几秒钟)。作为一种解决方法,我可能只是在我的onMessage实现中阻塞,或者在内部缓冲消息。然而,这似乎与 Spring Kafka 的核心思想背道而驰。

Kafka 的设计使消费者必须轮询符合我要求的新消息。有没有办法在 Spring Kafka 中使用这种“自然”的工作流程?

对于这个用例,我应该避免使用 Spring Kafka 吗?

KafkaConsumer文件指出:

对于消息处理时间变化不可预测的用例,这些选项都可能不够。处理这些情况的推荐方法是将消息处理移动到另一个线程,这允许消费者在处理器仍在工作时继续调用 poll。必须注意确保提交的偏移量不会超过实际位置。通常,您必须禁用自动提交并仅在线程完成处理记录后手动提交记录的处理偏移量(取决于您需要的交付语义)。另请注意,您需要暂停分区,以便在线程完成处理先前返回的记录之前不会从轮询接收新记录。

相关问题:https : //github.com/spring-projects/spring-kafka/issues/195

Gar*_*ell 8

必须继续轮询消费者的问题现已解决(在 0.10.1.x 中由KIP-62),因此这不再是问题(只要您不超过max.poll.interval.ms),默认情况下为 5 分钟,但可以增加。

然而,如果你想轮询自己,你仍然可以使用 spring-kafka(例如,如果你使用 Boot 来获得 Spring Boot 自动配置的好处),但你可以直接ConsumerDefaultKafkaConsumerFactoryand 中获取 a poll()