等待 @KafkaListener 完成使用 @EmbeddedKafka 的测试中的消息处理

Nip*_*ple 5 mockito spring-kafka-test

我有一个@KafkaListener消费者,想编写集成测试。Consumer#consume事实是,在处理消息并且数据库中的某些状态发生变化后,似乎很难找到方法完成执行以执行某些断言的确切时刻。

@Component
public class Consumer {

    private final Service service;

    @KafkaListener(id = "id", groupId = "group", topics = "topic", containerFactory = "factory")
    public void consume(@Payload Message message, Acknowledgment acknowledgment) {
        service.process(message);
        acknowledgment.acknowledge();
    }

}
Run Code Online (Sandbox Code Playgroud)

测试

@SpringBootTest
@EmbeddedKafka
void class Testing {
    // some useful beans 

    @SpyBean
    private Consumer consumer;

    @Test
    void shoudConsume() throws Exception {
        Message message = new Message();
        String topic = "topic";
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        new KafkaProducer<String, String>(senderProps).send(new ProducerRecord<>(topic, message))
                .get(1L, TimeUnit.SECONDS);

        Mockito.verify(consumer, Mockito.timeout(1_000L)).consume(any(Message.class), any(Acknowledgment.class));
        // perform some asserts
    }
Run Code Online (Sandbox Code Playgroud)

事实是,如果我让Thread.sleep(1000L)消费者​​处理消息并且一切正常,但使用 Mockito 则不起作用,因为所有断言都在消费者完成方法执行之前执行Consumer#consume

是否有机会(使用侦听器等)捕获消费者确认/完成消息处理的时刻,@KafkaListener以使用适当的数据库状态执行断言?需要进行集成测试以确保端到端功能正常工作。

我还尝试对, method进行#verify检查,但它也不起作用。@SpyBean private Service serviceService#process

小智 0

使用下一个方法,您可以以 N 秒的间隔轮询 2 个主题的事件。您必须有足够的时间调用 fetchEventFromOutputTopic。我将它与 kafka 流一起使用,效果很好。

 private Map<String, List<Foo>> fetchEventFromOutputTopic(int seconds) throws Exception {
    Map<String, List<Foo>> result = new HashMap<>();
    result.put("topic-out-0", new ArrayList<>());
    result.put("topic-out-1", new ArrayList<>());

    int i = 0;
    while (i < seconds) {
        ConsumerRecords<String, Event> records = consumer.poll(Duration.ofSeconds(1));
        records.records("topic-out-0").forEach(record -> result.get("topic-out-0").add(record.value()));
        records.records("topic-out-1").forEach(record -> result.get("topic-out-1").add(record.value()));
        Thread.sleep(1000);
        i++;
    }
    return result;
}
Run Code Online (Sandbox Code Playgroud)