我正在编写一个消费者,它监听 Kafka 主题并在消息可用时消费消息。我已经通过在本地运行 Kafka 测试了逻辑/代码,并且工作正常。
在编写单元/组件测试用例时,它因 avro 架构注册表 url 错误而失败。我尝试过互联网上提供的不同选项,但找不到任何有效的方法。我不确定我的方法是否正确。请帮忙。
听众班
@KafkaListener(topics = "positionmgmt.v1", containerFactory = "genericKafkaListenerFactory")
public void receive(ConsumerRecord<String, GenericRecord> consumerRecord) {
try {
GenericRecord generic = consumerRecord.value();
Object obj = generic.get("metadata");
ObjectMapper mapper = new ObjectMapper();
Header headerMetaData = mapper.readValue(obj.toString(), Header.class);
System.out.println("Received payload : " + consumerRecord.value());
//Call backend with details in GenericRecord
}catch (Exception e){
System.out.println("Exception while reading message from Kafka " + e );
}
Run Code Online (Sandbox Code Playgroud)
卡夫卡配置
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> genericKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = …Run Code Online (Sandbox Code Playgroud) java junit apache-kafka spring-boot confluent-schema-registry