从 Kafka 存储变更日志读取时发生 OffsetOutOfRangeException

kel*_*ket 1 apache-kafka apache-kafka-streams

我有一个 Kafka Streams 应用程序,它正在从商店更改日志中读取数据,偶尔会抛出此错误:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topic-partition=offset}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:928)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1185)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:84)
    at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:319)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
Run Code Online (Sandbox Code Playgroud)

我认为消费者应该默认latest。即使我尝试使用 或 来配置我的流属性ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatestearliest仍然看到此错误。为什么?

Mat*_*Sax 5

配置消费者重置策略只能用于读取实际输入主题。

对于变更日志主题(即恢复情况),重置策略始终设置为none内部,因为 Kafka Streams 需要手动处理这种情况。异常被捕获并记录为警告级别消息。之后,Kafka Streams 会进行一些内部清理并手动#seekToBeginning()重新启动恢复过程。

没有理由担心这一点。但是,会记录一条 WARN 消息来通知您有关该事件的信息。