我正在使用 Kafka Streams 并遇到了一种对我来说似乎有点奇怪的情况。我确实对此进行了搜索,但找不到正确的答案。在处理流中的元素时,如果发生任何未处理的异常,则会发生以下错误并且 Kafka Streams 无法恢复,停止处理更多消息,然后我需要重新启动我的应用程序。
2018-05-17 11:00:51.304 INFO 43810 --- [-StreamThread-3] o.a.k.s.p.internals.StreamThread : stream-thread [snapshot-stream-application-a1dbc5d4-1742-4af2-9919-d4f5f861bb36-StreamThread-3] Stream thread shutdown complete
2018-05-17 11:00:51.304 WARN 43810 --- [-StreamThread-3] o.a.k.s.p.internals.StreamThread : stream-thread [snapshot-stream-application-a1dbc5d4-1742-4af2-9919-d4f5f861bb36-StreamThread-3] Unexpected state transition from RUNNING to DEAD.
2018-05-17 11:00:51.305 ERROR 43810 --- [-StreamThread-3] c.u.kafkastreams.config.SnapshotStream : Error while processing:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_4] Failed to flush state store KTABLE-SOURCE-STATE-STORE-0000000000
Run Code Online (Sandbox Code Playgroud)
捕获所有异常肯定会解决问题,但我正在为 Kafka Streams 寻找合适的异常处理机制。这是我的示例代码:
private KafkaStreams streams;
@PostConstruct
public void runStreams() {
KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> snapshotTable = builder.table(kafkaProperties.getInputTopic()); …Run Code Online (Sandbox Code Playgroud)