将 StateRestoreListener 与 Spring Cloud Kafka Streams 绑定器结合使用

Che*_*lin 1 java spring apache-kafka spring-cloud-stream apache-kafka-streams

我将结合使用 StateRestoreListener 和 Spring Cloud Kafka Streams 绑定器。我需要监视应用程序的容错状态存储的恢复进度。汇合https://docs.confluence.io/current/streams/monitoring.html#streams-monitoring-runtime-status中有示例。

为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.kafka.streams.processor.StateRestoreListener 接口的实例。您可以通过调用 KafkaStreams#setGlobalStateRestoreListener 方法来设置 org.apache.kafka.streams.processor.StateRestoreListener。

第一个问题是从应用程序获取 Kafka Streams。我用以下方法解决了这个问题

StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
Run Code Online (Sandbox Code Playgroud)

第二个问题是将 StateRestoreListener 设置为 KafkaStreams,因为我收到错误

java.lang.IllegalStateException:只能在 CREATED 状态下设置 GlobalStateRestoreListener。当前状态是:正在运行

是否可以在 Spring Cloud Kafka Streams 绑定器中使用 StateRestoreListener?谢谢

sob*_*cko 5

您可以通过使用StreamsBuilderFactoryBeanCustomizer可访问底层KafkaStreams对象的 a 来做到这一点。如果您使用活页夹版本 3.0 或更高版本,这是推荐的方法。例如,您可以bean在应用程序中提供以下内容并使用GlobalStateRestoreListener.

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setGlobalStateRestoreListener(...);
            }
        });
    };
}
Run Code Online (Sandbox Code Playgroud)

该博客提供了有关此策略的更多详细信息。