Pet*_*zov 4 java spring apache-kafka spring-boot spring-kafka
我对 Kafka 有以下 Spring 配置:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@RefreshScope
@ConfigurationProperties(prefix = "kafka.status.producer")
public class StatusKafkaProducerConfig {
private String keySerializer;
private String valueSerializer;
private String bootstrapServers;
public void setKeySerializer(String keySerializer) {
this.keySerializer = keySerializer;
}
public void setValueSerializer(String valueSerializer) {
this.valueSerializer = valueSerializer;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return properties;
}
@Bean
@RefreshScope
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
}
Run Code Online (Sandbox Code Playgroud)
我将项目迁移到最新的Spring cloud 2021.0.1和Spring Boot 2.6.6。但我在启动时收到以下错误堆栈:
2022-04-30 19:14:08.350 ERROR 23684 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'producerFactory' defined in class path resource [com/test/StatusUpdateKafkaProducerConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.core.ProducerFactory]: Factory method 'producerFactory' threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:658) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:486) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:953) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) ~[spring-context-5.3.19.jar:5.3.19]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) ~[spring-context-5.3.19.jar:5.3.19]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-2.6.7.jar:2.6.7]
at com.test.Application.main(Application.java:82) ~[main/:na]
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.core.ProducerFactory]: Factory method 'producerFactory' threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:653) ~[spring-beans-5.3.19.jar:5.3.19]
... 19 common frames omitted
Caused by: java.lang.NullPointerException: null
at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011) ~[na:na]
at java.base/java.util.concurrent.ConcurrentHashMap.putAll(ConcurrentHashMap.java:1089) ~[na:na]
at java.base/java.util.concurrent.ConcurrentHashMap.<init>(ConcurrentHashMap.java:852) ~[na:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.<init>(DefaultKafkaProducerFactory.java:205) ~[spring-kafka-2.8.5.jar:2.8.5]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.<init>(DefaultKafkaProducerFactory.java:168) ~[spring-kafka-2.8.5.jar:2.8.5]
at com.test.StatusUpdateKafkaProducerConfig.producerFactory(TransactionStatusUpdateKafkaProducerConfig.java:53) ~[commons-0.0.1-plain.jar:na]
at com.test.StatusUpdateKafkaProducerConfig$$EnhancerBySpringCGLIB$$e907de88.CGLIB$producerFactory$0(<generated>) ~[commons-0.0.1-plain.jar:na]
at com.test.StatusUpdateKafkaProducerConfig$$EnhancerBySpringCGLIB$$e907de88$$FastClassBySpringCGLIB$$db10662a.invoke(<generated>) ~[commons-0.0.1-plain.jar:na]
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244) ~[spring-core-5.3.19.jar:5.3.19]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-5.3.19.jar:5.3.19]
at com.StatusUpdateKafkaProducerConfig$$EnhancerBySpringCGLIB$$e907de88.producerFactory(<generated>) ~[commons-0.0.1-plain.jar:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282) ~[spring-core-5.3.19.jar:5.3.19]
at org.springframework.cloud.context.scope.GenericScope$LockedScopedProxyFactoryBean.invoke(GenericScope.java:485) ~[spring-cloud-context-3.1.2.jar:3.1.2]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.19.jar:5.3.19]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.19.jar:5.3.19]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708) ~[spring-aop-5.3.19.jar:5.3.19]
at com.StatusUpdateKafkaProducerConfig$$EnhancerBySpringCGLIB$$6800bd04.producerFactory(<generated>) ~[commons-0.0.1-plain.jar:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154) ~[spring-beans-5.3.19.jar:5.3.19]
... 20 common frames omitted
Run Code Online (Sandbox Code Playgroud)
你知道我该如何解决这个问题吗?
错误堆栈跟踪表明当您在方法中NullPointerException创建新实例时发生了a 。DefaultKafkaProducerFactoryproducerFactory()
在内部,使用提供的信息DefaultKafkaProducerFactory 创建一个来维护其配置。ConcurrentHashMap
ConcurrentHashMap正如您在javadocs中看到的,如果提供的任何键或值是,该put方法将引发。NullPointerExceptionnull
因此,其中一个或多个变量keySerializer、valueSerializer或可能bootstrapServers为空。
我假设这些值是根据您的@ConfigurationProperties kafka.status.producer. 请确保这些属性在您的 Spring 配置中正确定义。
出于测试目的,请尝试删除注释@RefreshScope:它可能是导致问题的原因。
相反的也可能是一个有效的解决方案。正如 Spring Cloud 文档中在谈论@RefreshScope时所指出的:
@RefreshScope(技术上)适用于一个@Configuration类,但它可能会导致令人惊讶的行为。例如,这并不意味着@Beans该类中定义的所有内容本身都在@RefreshScope. 具体来说,任何依赖于这些 bean 的东西都不能依赖它们在刷新启动时更新,除非它本身位于@RefreshScope. 在这种情况下,它会在刷新时重建,并重新注入其依赖项。此时,它们将从刷新的@Configuration) 中重新初始化。
@RefreshScope从这个角度来看,用bean进行注释producerFactory也是完全有意义的。我会尝试:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@RefreshScope
@ConfigurationProperties(prefix = "kafka.status.producer")
public class StatusKafkaProducerConfig {
private String keySerializer;
private String valueSerializer;
private String bootstrapServers;
public void setKeySerializer(String keySerializer) {
this.keySerializer = keySerializer;
}
public void setValueSerializer(String valueSerializer) {
this.valueSerializer = valueSerializer;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
@Bean
@RefreshScope
public Map<String, Object> producerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return properties;
}
@Bean
@RefreshScope
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
@RefreshScope
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
}
Run Code Online (Sandbox Code Playgroud)
请注意附加注释的包含@RefreshScope。
最后,考虑查看 Spring Kafka 项目文档,它提供了几个有关如何配置项目的示例,无论您是否使用Spring Boot,并为您提供了有关.DefaultKafkaProducerFactory
正如您在上述文档中看到的,如果您使用 Spring Boot,就像您的情况一样,代码可以大大简化。考虑查看有关主题和相关属性配置的Spring Boot相关文档。
| 归档时间: |
|
| 查看次数: |
23557 次 |
| 最近记录: |