如何在 Spring Kafka Stream 中配置 UncaughtExceptionHandler

ric*_*din 3 java spring-batch apache-kafka-streams spring-kafka

我正在spring-kafka使用 Spring Boot 1.5.16 来实现流应用程序。我们使用的版本spring-kafka是1.3.8.RELEASE。

我正在寻找一种方法来关闭启动应用程序,以防出现终止与 Kafka Streams 关联的所有线程的错误。我发现在内部KafkaStreams可以注册未捕获异常的句柄。方法是setGlobalStateRestoreListener

我看到这个方法是spring-kafka在 type内部公开的KStreamBuilderFactoryBean

我的问题如下。有没有一种简单的方法将 注册UncaughtExceptionHandler为 bean 并让 Spring 正确注入到工厂 bean 中?或者我应该创建KStreamBuilderFactoryBean自己的并手动设置处理程序?

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
    final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
            streamsConfig);
    streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
        // Something happens here
    });
    return streamBuilderFactoryBean;
}
Run Code Online (Sandbox Code Playgroud)

多谢。

Art*_*lan 5

是的。在那个旧版本中,您必须KStreamBuilderFactoryBean自己通过适当的注入来指定一个bean,并且准确地说KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME

在以后的版本中,我们已经StreamsBuilderFactoryBeanConfigurer保留了自动配置KStreamBuilderFactoryBean,但能够以我们需要的任何方式对其进行修改。

更新

您只需在应用程序上下文中将其创建为 bean,框架就会选择它并应用到StreamsBuilderFactoryBean

    @Bean
    StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
        return new StreamsBuilderFactoryBeanConfigurer() {

            @Override
            public void configure(StreamsBuilderFactoryBean factoryBean) {
                factoryBean.setCloseTimeout(...);
            }

            @Override
            public int getOrder() {
                return Integer.MAX_VALUE;
            }

        };
    }
Run Code Online (Sandbox Code Playgroud)

  • 我们将从 1.5.15 版本迁移到 Spring Boot 2.2.6。现在,我们了解了使用“StreamsBuilderFactoryBeanCustomizer”接口的解决方案的强大功能。感谢您的参考! (2认同)