Jon*_*lli 6 apache-kafka apache-kafka-streams
我有一个 Kafka Streams 应用程序,每当我重新启动它时,它正在使用的主题的偏移量都会重置。因此,对于所有分区,延迟都会增加,应用程序需要重新处理所有数据。
更新: 输出主题正在接收在应用程序重新启动后已经处理的突发事件,并不是我在上一段中所说的输入主题偏移量正在重置。但是,内部主题(KTABLE-SUPPRESS-STATE-STORE)偏移量正在重置,请参阅下面的评论。
我已经确保在重新启动之前每个分区的延迟为 1(这是针对输出主题的)。属于该消费者组 ID (app-id) 的所有消费者都处于活动状态。重新启动是立即的,大约需要 30 秒。
该应用程序仅使用一次作为处理保证。
我已经阅读了这个答案Apache Kafka 消费者组的偏移量如何过期?.
我试过auto.offset.reset = latest和auto.offset.reset = early。
似乎这些主题的偏移量并未有效提交,(但我不确定这一点)。
我假设在重新启动后,应用程序应该从该消费者组的最新提交偏移量中提取。
更新: 我假设这是内部主题(KTABLE-SUPPRESS-STATE-STORE)
Kafka Stream API 是否确保在关闭之前提交所有消耗的偏移量?(在调用streams.close() 之后)
我真的很感激任何关于这方面的线索。
更新:
这是应用程序执行的代码:
final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
.stream(inputTopicNames, Consumed.with(..., ...)
.withTimestampExtractor(...);
events
.filter((k, v) -> ...)
.flatMapValues(v -> ...)
.flatMapValues(v -> ...)
.selectKey((k, v) -> v)
.groupByKey(Grouped.with(..., ...))
.windowedBy(
TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
.advanceBy(Duration.ofSeconds(windowSizeInSecs))
.grace(Duration.ofSeconds(windowSizeGraceInSecs)))
.reduce((agg, new) -> {
...
return agg;
})
.suppress(Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()))
.toStream()
.to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
Run Code Online (Sandbox Code Playgroud)
偏移量重置总是(在重新启动后)与由 Kafka Stream API 创建的KTABLE-SUPPRESS-STATE-STORE内部主题一起发生。
我已经尝试过一次并且至少一次使用处理保证。
再一次,我将非常感谢任何有关此的线索。
更新: 这已在2.2.1版中解决(https://issues.apache.org/jira/browse/KAFKA-7895)
归档时间: |
|
查看次数: |
1764 次 |
最近记录: |