使用 Kafka/Spring Boot 发送对象

Alp*_*dız 5 apache-kafka spring-boot kafka-consumer-api

我只想将 BackAccount 对象发送给生产者,并且我想使用该对象。我有生产者和消费者配置,但它给了我错误。

我的 KafkaProducer 配置:

@Configuration
public class KafkaProducerConfig {


    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, BankAccount> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, BankAccount> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }



}
Run Code Online (Sandbox Code Playgroud)

卡夫卡消费者配置:

@Configuration
public class KafkaConsumerConfig {


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, BankAccount> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(BankAccount.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, BankAccount> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, BankAccount> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}
Run Code Online (Sandbox Code Playgroud)

这确实很基本,但我的控制器是:

public class KafkaController {

    @Autowired
    private KafkaTemplate<String,BankAccount> kafkaTemplate;

    private static final String TOPIC = "my-topic";

    @GetMapping("/publish/{message}")
    public String postKafka(@PathVariable("message") final String message){
        BankAccount bankAccount= new BankAccount();
        bankAccount.setBalance(120.0);

        kafkaTemplate.send(TOPIC,bankAccount);
        return "Succesfully";
    }


}
Run Code Online (Sandbox Code Playgroud)

我在 KafkaConsumerConfig 中有错误:

return new DefaultKafkaConsumerFactory<>(
                consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(BankAccount.class));
Run Code Online (Sandbox Code Playgroud)

在这里: JsonDeserializer<>(BankAccount.class) Diamond 运算符不适用于非参数化类型

我在参考链接中做了同样的事情:https://www.codenotfound.com/spring-kafka-json-serializer-deserializer-example.html

有人能帮我吗?