spring总是需要KafkaTemplate吗?

use*_*450 2 apache-kafka spring-boot spring-kafka

问题:springboot总是需要创建KafkaTemplate类型的bean吗?下面的详细信息/堆栈跟踪/代码库,请告诉我我做错了什么。谢谢

  1. 我一直在 Spring Boot 项目的某个主题中发布消息
  2. 为了创建回调机制,我使用了 org.apache.kafka.clients. Producer.KafkaProducer.send(ProducerRecord<K, V>, Callback) 来发送消息并创建回调
  3. 我这样做的原因是因为使用 KafkaTemplate 时的可听未来仅提供失败异常(并且我想在所有用例中将回调注册为单独的可重用类)
  4. 但是,当我没有定义 KafkaTemplate 类型的 bean 并出现以下错误时,spring 无法启动
    引起:org.springframework.beans.factory.UnsatisfiedDependencyException:创建在类路径资源[org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]中定义的名称为“kafkaTemplate”的bean时出错:通过方法“kafkaTemplate”表达的依赖关系不满足参数0;嵌套异常是 org.springframework.beans.factory.NoSuchBeanDefinitionException:没有“org.springframework.kafka.core.ProducerFactory”类型的合格 bean 可用:预计至少有 1 个有资格作为自动装配候选者的 bean。依赖注释:{}
        在 org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12]
        在org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration。(KafkaAnnotationDrivenConfiguration.java:90)〜[spring-boot-autoconfigure-2.4.12.jar:2.4.12]
        在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(本机方法) ~[na:na]
        在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na]
        在 java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na]
        在 java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na]
        在 org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211) ~[spring-beans-5.3.12.jar:5.3.12]
        ...省略22个常用帧
    引起原因:org.springframework.beans.factory.NoSuchBeanDefinitionException:没有可用的“org.springframework.kafka.core.ProducerFactory”类型的合格bean:预计至少有1个符合自动装配候选资格的bean。依赖注释:{}
        在 org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1790) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1346) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1300) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:887) ~[spring-beans-5.3.12.jar:5.3.12]
        在 org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791) ~[spring-beans-5.3.12.jar:5.3.12]
        ...省略40个常用帧

我的卡夫卡配置如下

  @Configuration
public class KafkaEventConfig {

    private final KafkaProperties kafkaProperties;

    @Value("${client.id}")
    private String clientId;


    @Value("${topic.movie.name}")
    private String movieTopicName;
    
    @Value("${retry.backoff.ms}")
    private int retryBackoffMilliseconds;

    @Value("${request.timeout.ms}")
    private int requestTimeoutMilliseconds;

    public KafkaEventConfig(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public ProducerFactory<String, Movie> producerFactory() {
        Map<String, Object> props = kafkaProperties.buildProducerProperties();
        populateCommonProperties(props);
        return new DefaultKafkaProducerFactory<>(props);
    }

    private void populateCommonProperties(Map<String, Object> props) {
        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMilliseconds);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMilliseconds);
    }
    
    @Bean
    public KafkaProducer<String, Movie> movieKafkaProducer() {
        return new KafkaProducer<String, Movie>(producerFactory().getConfigurationProperties());
    }

    @Bean
    public KafkaProducerMonitor kafkaProducerMonitor(KafkaProducer<String, Movie> kafkaProducer,
            MeterRegistry registry) {
        return new KafkaProducerMonitor(kafkaProducer, registry, Tags.of("topic", movieTopicName));
    }
Run Code Online (Sandbox Code Playgroud)

我的卡夫卡回调如下

       @Slf4j 
public class KafkaProducerCallBack<K, V> implements Callback {

    private ProducerRecord<K, V> producerRecord;

    public KafkaProducerCallBack(ProducerRecord<K, V> producerRecord) {

        this.producerRecord = producerRecord;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        String topicName= metadata.topic();
        long offset= metadata.offset();
        
        if (exception != null) {

            log.error("Failed to produce message [{}] to topic {} with exception {}", producerRecord, topicName, exception);
        }

        else {

            log.info("Sucessfully published message [{}] to topic {} to offset {}", producerRecord, topicName , offset);
            
        }

    }

}
Run Code Online (Sandbox Code Playgroud)

我像这样发布消息

movieKafkaProducer.send(message, new KafkaProducerCallBack<String, Movie>(message));
Run Code Online (Sandbox Code Playgroud)

请注意,当我在 KafkaEventConfig 中添加以下行时,一切正常

@Bean
    public KafkaTemplate<String, Movie> movieKafkaTemplate() {
        return new KafkaTemplate<String, Movie>(producerFactory());
    }
Run Code Online (Sandbox Code Playgroud)

小智 5

除了 @M.Deinum 提到的后者之外:

看看班级KafkaAutoConfiguration

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener,
            ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

Run Code Online (Sandbox Code Playgroud)

KafkaTemplate如果您不创建自己的bean,Springboot将为您创建一个bean。这个自动配置的 bean 依赖于ProducerFactory<Object, Object>bean,并且因为您声明了一个ProducerFactory<String, Movie>. 正如您所看到的类型不合适,这就是您收到错误的原因。


我这样做的原因是因为使用 KafkaTemplate 时的可听未来仅提供失败异常(并且我想在所有用例中将回调注册为单独的可重用类)

您的情况,您仍然可以获得使用的优势KafkaTemplateCallback您可以实现自己的并将ProducerListener<K, V>其绑定到您的KafkaTemple. 例如:

FullLoggingProducerListener.class

public class FullLoggingProducerListener<K, V> implements ProducerListener<K, V> {
    @Override
    public void onSuccess(ProducerRecord<K, V> record, RecordMetadata recordMetadata) {
        log.info("Successful!");
    }

    @Override
    public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) {
        log.error("Error!");
    }
}
Run Code Online (Sandbox Code Playgroud)

YourConfigration.class

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, Movie> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
        KafkaTemplate<String, Movie> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        return kafkaTemplate;
    }
Run Code Online (Sandbox Code Playgroud)

现在,每次您使用KafkaTemplate发送记录时,您都会看到日志。