我有一个@KafkaListener消费者,想编写集成测试。Consumer#consume事实是,在处理消息并且数据库中的某些状态发生变化后,似乎很难找到方法完成执行以执行某些断言的确切时刻。
@Component
public class Consumer {
private final Service service;
@KafkaListener(id = "id", groupId = "group", topics = "topic", containerFactory = "factory")
public void consume(@Payload Message message, Acknowledgment acknowledgment) {
service.process(message);
acknowledgment.acknowledge();
}
}
Run Code Online (Sandbox Code Playgroud)
测试
@SpringBootTest
@EmbeddedKafka
void class Testing {
// some useful beans
@SpyBean
private Consumer consumer;
@Test
void shoudConsume() throws Exception {
Message message = new Message();
String topic = "topic";
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
new KafkaProducer<String, String>(senderProps).send(new ProducerRecord<>(topic, message))
.get(1L, TimeUnit.SECONDS); …Run Code Online (Sandbox Code Playgroud)