小编Jon*_*lli的帖子

为什么我的 Kafka Streams 应用程序的消费者组 (app-id) 的偏移量会在应用程序重启后重置?

我有一个 Kafka Streams 应用程序,每当我重新启动它时,它正在使用的主题的偏移量都会重置。因此,对于所有分区,延迟都会增加,应用程序需要重新处理所有数据。

更新: 输出主题正在接收在应用程序重新启动后已经处理的突发事件,并不是我在上一段中所说的输入主题偏移量正在重置。但是,内部主题(KTABLE-SUPPRESS-STATE-STORE)偏移量正在重置,请参阅下面的评论。

我已经确保在重新启动之前每个分区的延迟为 1(这是针对输出主题的)。属于该消费者组 ID (app-id) 的所有消费者都处于活动状态。重新启动是立即的,大约需要 30 秒。

该应用程序仅使用一次作为处理保证。

我已经阅读了这个答案Apache Kafka 消费者组的偏移量如何过期?.

我试过auto.offset.reset = latestauto.offset.reset = early

似乎这些主题的偏移量并未有效提交,(但我不确定这一点)。

我假设在重新启动后,应用程序应该从该消费者组的最新提交偏移量中提取。

更新: 我假设这是内部主题(KTABLE-SUPPRESS-STATE-STORE

Kafka Stream API 是否确保在关闭之前提交所有消耗的偏移量?(在调用streams.close() 之后

我真的很感激任何关于这方面的线索。

更新

这是应用程序执行的代码:

final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(...);

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...)) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

6
推荐指数
1
解决办法
1764
查看次数

存储在Apache Flink中

在处理了数以百万计的事件/数据之后,哪里有存储信息的最佳位置,以说有价值可以节省数百万个事件?我看到提交 Parquet格式的提交已关闭拉取请求,但是,默认是HDFS?我关心的是保存(在哪里?)如果检索数据很容易(快!)?

apache-flink

4
推荐指数
1
解决办法
1297
查看次数