Che*_*lin 1 java spring apache-kafka spring-cloud-stream apache-kafka-streams
我将结合使用 StateRestoreListener 和 Spring Cloud Kafka Streams 绑定器。我需要监视应用程序的容错状态存储的恢复进度。汇合https://docs.confluence.io/current/streams/monitoring.html#streams-monitoring-runtime-status中有示例。
为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.kafka.streams.processor.StateRestoreListener 接口的实例。您可以通过调用 KafkaStreams#setGlobalStateRestoreListener 方法来设置 org.apache.kafka.streams.processor.StateRestoreListener。
第一个问题是从应用程序获取 Kafka Streams。我用以下方法解决了这个问题
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
Run Code Online (Sandbox Code Playgroud)
第二个问题是将 StateRestoreListener 设置为 KafkaStreams,因为我收到错误
java.lang.IllegalStateException:只能在 CREATED 状态下设置 GlobalStateRestoreListener。当前状态是:正在运行
是否可以在 Spring Cloud Kafka Streams 绑定器中使用 StateRestoreListener?谢谢
您可以通过使用StreamsBuilderFactoryBeanCustomizer可访问底层KafkaStreams对象的 a 来做到这一点。如果您使用活页夹版本 3.0 或更高版本,这是推荐的方法。例如,您可以bean在应用程序中提供以下内容并使用GlobalStateRestoreListener.
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setGlobalStateRestoreListener(...);
}
});
};
}
Run Code Online (Sandbox Code Playgroud)
该博客提供了有关此策略的更多详细信息。
| 归档时间: |
|
| 查看次数: |
1338 次 |
| 最近记录: |