use*_*669 3 apache-kafka spring-boot
我正在使用 spring boot,apache kafka。
下面是我的控制器代码
package com.infy.controller;
@RestController
@RequestMapping(value = "/serving/")
public class ServingRequestWebControllerImpl implements ServingRequestWebController {
private static final Logger logger = LoggerFactory.getLogger(ServingRequestWebControllerImpl.class);
@Autowired
KafkaTemplate<String, TaskDetailsEntity> kafkaJsontemplate;
String TOPIC_NAME = "sample-topic";
@Override
@PostMapping(value = "/produce", consumes = { "application/json" }, produces = { "application/json" })
public String taskQueue(TaskDetailsEntity taskgDetailsEntity) {
kafkaJsontemplate.send(TOPIC_NAME, taskgDetailsEntity);
return "Serving Request Published Successfully To:- " + TOPIC_NAME;
}
}
Run Code Online (Sandbox Code Playgroud)
下面是Kafka配置代码
package com.infy.config;
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
public ProducerFactory<String, ServingDetailsEntity> producerFactoryServingDetail() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate<String, ServingDetailsEntity> kafkaTemplateServingDetailsListener() {
return new KafkaTemplate<String, ServingDetailsEntity>(producerFactoryServingDetail());
}
@Bean
public ConsumerFactory<String, ServingDetailsEntity> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "event-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(ServingDetailsEntity.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ServingDetailsEntity> kafkaTemplateTaskDetailsListener() {
ConcurrentKafkaListenerContainerFactory<String, ServingDetailsEntity> factory = new ConcurrentKafkaListenerContainerFactory<String, ServingDetailsEntity>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Run Code Online (Sandbox Code Playgroud)
以下是我的消费者代码
package com.infy.consumer;
@Service
public class KafkaConsumerService {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
@Autowired
private ServingService service;
@KafkaListener(topics = "sample-topic", groupId = "event-group",containerFactory = "kafkaTemplateTaskDetailsListener")
public void consumeTask(TaskDetailsEntity taskDtls){
System.out.println("Consumed Message:- \n "+taskDtls);
TaskDetailsEntity upsert = service.taskUpsert(taskDtls);
System.out.println(upsert.getId());
logger.info("\n Exit KafkaConsumerService consumeTask");
}
}
Run Code Online (Sandbox Code Playgroud)
当我运行我的 Spring Boot 应用程序时,出现以下错误
描述:com.infy.controller.ServingRequestWebControllerImpl 中的字段 kafkaJsontemplate 需要类型为“org.springframework.kafka.core.KafkaTemplate”的 bean,但无法找到。注入点具有以下注释: - @org.springframework.beans.factory.annotation.Autowired(required=true) 找到以下候选但无法注入: - 'KafkaAutoConfiguration' 中的 Bean 方法 'kafkaTemplate' 未加载,因为自动配置“KafkaAutoConfiguration”被排除 操作:考虑重新访问上面的条目或在配置中定义“org.springframework.kafka.core.KafkaTemplate”类型的 bean。
我尝试将以下配置添加到 application.properties,但没有运气
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
Run Code Online (Sandbox Code Playgroud)
下面是我的 Spring Boot 主要代码
package com.infy;
@SpringBootApplication
public class SpringBootInitializer {
public static void main(String[] args) {
SpringApplication.run(SpringBootInitializer.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
请有人帮助我,我犯了什么错误。
小智 12
您想要自动装配类型为 的 bean KafkaTemplate<String, TaskDetailsEntity>,但您只在 KafkaConfig 中声明了类型为KafkaTemplate<String, String>和的 bean。KafkaTemplate<String, ServingDetailsEntity>
producerFactory()可能也是一个有@Bean注释和producerFactoryServingDetail()没有注释的错误。
| 归档时间: |
|
| 查看次数: |
27234 次 |
| 最近记录: |