flo*_*ong 6 java unit-testing junit5 apache-kafka-streams
我正在尝试使用 JUnit5 和 EmbededKafka 测试 KafkaStreams 应用程序。执行任何测试用例时,我看到日志被以下消息轰炸
[Producer clientId=myTask-227e2e90-212b-4663-bd17-2d307018c81a-StreamThread-1-producer] Connection to node 0 (localhost/127.0.0.1:61269) could not be established. Broker may not be available.
[Consumer clientId=myTask-227e2e90-212b-4663-bd17-2d307018c81a-StreamThread-3-consumer, groupId=my-task] Connection to node 0 (localhost/127.0.0.1:61269) could not be established. Broker may not be available.
Run Code Online (Sandbox Code Playgroud)
我还添加了
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
Run Code Online (Sandbox Code Playgroud)
类级别配置
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@TestPropertySource("/application-test.properties")
@EmbeddedKafka(topics = { "${kafka.topic.}"
},
partitions = 1, controlledShutdown = true
)
@ExtendWith(MockitoExtension.class)
public class KStreamTest{
@Autowired
protected EmbeddedKafkaBroker embeddedKafka;
//TestMethod
}
Run Code Online (Sandbox Code Playgroud)
你能建议我尝试什么或者如果我错过了什么吗?
我的生产者在多个测试用例之间也遇到了类似的问题。问题是,所ProdcuerFactory使用的KafkaTemplate没有注册为 Bean,因此没有被@DirtiesContext-Annotation 破坏。
所以你的 Bean 定义应该是这样的:
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
// ...
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
702 次 |
| 最近记录: |