小编Che*_*lin的帖子

将 StateRestoreListener 与 Spring Cloud Kafka Streams 绑定器结合使用

我将结合使用 StateRestoreListener 和 Spring Cloud Kafka Streams 绑定器。我需要监视应用程序的容错状态存储的恢复进度。汇合https://docs.confluence.io/current/streams/monitoring.html#streams-monitoring-runtime-status中有示例。

为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.kafka.streams.processor.StateRestoreListener 接口的实例。您可以通过调用 KafkaStreams#setGlobalStateRestoreListener 方法来设置 org.apache.kafka.streams.processor.StateRestoreListener。

第一个问题是从应用程序获取 Kafka Streams。我用以下方法解决了这个问题

StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
Run Code Online (Sandbox Code Playgroud)

第二个问题是将 StateRestoreListener 设置为 KafkaStreams,因为我收到错误

java.lang.IllegalStateException:只能在 CREATED 状态下设置 GlobalStateRestoreListener。当前状态是:正在运行

是否可以在 Spring Cloud Kafka Streams 绑定器中使用 StateRestoreListener?谢谢

java spring apache-kafka spring-cloud-stream apache-kafka-streams

1
推荐指数
1
解决办法
1338
查看次数