ric*_*din 3 java spring-batch apache-kafka-streams spring-kafka
我正在spring-kafka使用 Spring Boot 1.5.16 来实现流应用程序。我们使用的版本spring-kafka是1.3.8.RELEASE。
我正在寻找一种方法来关闭启动应用程序,以防出现终止与 Kafka Streams 关联的所有线程的错误。我发现在内部KafkaStreams可以注册未捕获异常的句柄。方法是setGlobalStateRestoreListener。
我看到这个方法是spring-kafka在 type内部公开的KStreamBuilderFactoryBean。
我的问题如下。有没有一种简单的方法将 注册UncaughtExceptionHandler为 bean 并让 Spring 正确注入到工厂 bean 中?或者我应该创建KStreamBuilderFactoryBean自己的并手动设置处理程序?
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
streamsConfig);
streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
// Something happens here
});
return streamBuilderFactoryBean;
}
Run Code Online (Sandbox Code Playgroud)
多谢。
是的。在那个旧版本中,您必须KStreamBuilderFactoryBean自己通过适当的注入来指定一个bean,并且准确地说KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME。
在以后的版本中,我们已经StreamsBuilderFactoryBeanConfigurer保留了自动配置KStreamBuilderFactoryBean,但能够以我们需要的任何方式对其进行修改。
更新
您只需在应用程序上下文中将其创建为 bean,框架就会选择它并应用到StreamsBuilderFactoryBean:
@Bean
StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {
factoryBean.setCloseTimeout(...);
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
};
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3043 次 |
| 最近记录: |