我们的要求是,如果 kafka-stream 应用程序正在消耗一个分区,它应该从该分区的最新偏移量开始消耗。
这似乎可以使用
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
Run Code Online (Sandbox Code Playgroud)
现在,假设使用上述配置,kafka-stream 应用程序开始从分区的最新偏移量开始使用数据。一段时间后,应用程序崩溃。当应用程序重新上线时,我们希望它使用该分区最新偏移量的数据,而不是它上次读取时离开的位置。
但是我找不到任何可以帮助使用 kafka-streams api 实现它的东西。
PS 我们使用的是 kafka-1.0.0。