Alp*_*dız 5 apache-kafka spring-boot kafka-consumer-api
我只想将 BackAccount 对象发送给生产者,并且我想使用该对象。我有生产者和消费者配置,但它给了我错误。
我的 KafkaProducer 配置:
@Configuration
public class KafkaProducerConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, BankAccount> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, BankAccount> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Run Code Online (Sandbox Code Playgroud)
卡夫卡消费者配置:
@Configuration
public class KafkaConsumerConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, BankAccount> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(BankAccount.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, BankAccount> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, BankAccount> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Run Code Online (Sandbox Code Playgroud)
这确实很基本,但我的控制器是:
public class KafkaController {
@Autowired
private KafkaTemplate<String,BankAccount> kafkaTemplate;
private static final String TOPIC = "my-topic";
@GetMapping("/publish/{message}")
public String postKafka(@PathVariable("message") final String message){
BankAccount bankAccount= new BankAccount();
bankAccount.setBalance(120.0);
kafkaTemplate.send(TOPIC,bankAccount);
return "Succesfully";
}
}
Run Code Online (Sandbox Code Playgroud)
我在 KafkaConsumerConfig 中有错误:
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(BankAccount.class));
Run Code Online (Sandbox Code Playgroud)
在这里: JsonDeserializer<>(BankAccount.class) Diamond 运算符不适用于非参数化类型
我在参考链接中做了同样的事情:https://www.codenotfound.com/spring-kafka-json-serializer-deserializer-example.html
有人能帮我吗?
| 归档时间: |
|
| 查看次数: |
10375 次 |
| 最近记录: |