小编Nip*_*ple的帖子

等待 @KafkaListener 完成使用 @EmbeddedKafka 的测试中的消息处理

我有一个@KafkaListener消费者,想编写集成测试。Consumer#consume事实是,在处理消息并且数据库中的某些状态发生变化后,似乎很难找到方法完成执行以执行某些断言的确切时刻。

@Component
public class Consumer {

    private final Service service;

    @KafkaListener(id = "id", groupId = "group", topics = "topic", containerFactory = "factory")
    public void consume(@Payload Message message, Acknowledgment acknowledgment) {
        service.process(message);
        acknowledgment.acknowledge();
    }

}
Run Code Online (Sandbox Code Playgroud)

测试

@SpringBootTest
@EmbeddedKafka
void class Testing {
    // some useful beans 

    @SpyBean
    private Consumer consumer;

    @Test
    void shoudConsume() throws Exception {
        Message message = new Message();
        String topic = "topic";
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        new KafkaProducer<String, String>(senderProps).send(new ProducerRecord<>(topic, message))
                .get(1L, TimeUnit.SECONDS); …
Run Code Online (Sandbox Code Playgroud)

mockito spring-kafka-test

5
推荐指数
1
解决办法
8620
查看次数

标签 统计

mockito ×1

spring-kafka-test ×1