如何始终使用 kafka-streams 中的最新偏移量

Sal*_*ani 7 apache-kafka apache-kafka-streams

我们的要求是,如果 kafka-stream 应用程序正在消耗一个分区,它应该从该分区的最新偏移量开始消耗。

这似乎可以使用

streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
Run Code Online (Sandbox Code Playgroud)

现在,假设使用上述配置,kafka-stream 应用程序开始从分区的最新偏移量开始使用数据。一段时间后,应用程序崩溃。当应用程序重新上线时,我们希望它使用该分区最新偏移量的数据,而不是它上次读取时离开的位置。

但是我找不到任何可以帮助使用 kafka-streams api 实现它的东西。

PS 我们使用的是 kafka-1.0.0。

Mat*_*Sax 8

这不支持开箱即用。

配置auto.offset.reset只有触发器,如果没有坚定的偏移,也没有配置改变这种行为。

你可以使用启动之前手动操纵偏移bin/kafka-consumer-groups.sh虽然-的application.idgroup.id和你可以“寻求结束”重新启动应用程序之前。

更新:

从 1.1.0 版本开始,您可以使用bin/kafka-streams-application-reset.sh工具来设置起始偏移。要使用该工具,应用程序必须处于离线状态。(参见:https : //cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application