嵌入式 Kafka Spring 测试在嵌入式 Kafka 准备就绪之前执行

Gee*_*son 6 java apache-kafka spring-boot spring-kafka embedded-kafka

我有一个 Spring Boot 项目,其中有一个 Kafka 侦听器,我想使用嵌入式 Kafka 对其进行测试。我让卡夫卡监听器注销消息“收到记录”。Thread.sleep(1000)只有当我在方法的开头添加 a 时,才会被注销。

测试类:

@SpringBootTest
@DirtiesContext
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "my-topic" }, ports = 7654)
class KafkaTest {

    private static final String TOPIC = "my-topic";

    @Autowired
    EmbeddedKafkaBroker kafkaBroker;

    @Test
    void testSendEvent() throws ExecutionException, InterruptedException {
        // Thread.sleep(1000); // I wont see the Listener log message unless I add this sleep
        Producer<Integer, String> producer = configureProducer();
        ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(TOPIC, "myMessage");
        producer.send(producerRecord).get();
        producer.close();
    }

    private Producer<Integer, String> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
        return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
    }
}
Run Code Online (Sandbox Code Playgroud)

我不想使用变化无常的Thread.sleep()测试显然是在某些设置过程完成之前执行的。我显然需要等待某件事,但我不确定什么或如何做。

使用:

  • 爪哇11
  • 春季启动2.5.6
  • J单元5
  • 弹簧卡夫卡测试 2.7.8

Gar*_*ell 5

将一个@EventListenerbean 添加到测试上下文中,并(例如)在收到CountDownLatcha 时对 a 进行倒计时;ConsumerStartedEvent然后在测试中

assertThat(eventListner.getLatch().await(10, TimeUnit.SECONDS)).isTrue();
Run Code Online (Sandbox Code Playgroud)

请参阅https://docs.spring.io/spring-kafka/docs/current/reference/html/#events

https://docs.spring.io/spring-kafka/docs/current/reference/html/#event-consumation

或者添加一个ConsumerRebalanceListener并等待分区分配。

  • 经过进一步考虑,重新平衡监听器是一个更好的选择,因为在消费者启动和分配分区之间仍然存在一个小的竞争条件。等待分配实用程序方法的工作原理类似。另外,对消费者设置 auto.offset.reset=earliest 应该会有所帮助。 (2认同)

Art*_*ich 0

我显然需要等待某件事,但我不确定什么或如何做。

您需要使用不同的方法来留出Kafka时间来处理和路由消息......

看看这一行...

ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);
Run Code Online (Sandbox Code Playgroud)

在测试 Kafka 侦听器时,我们总是指定轮询延迟。这是因为您的消息被发送给 kafka,然后 kafka 将在另一个线程中处理它。你需要等待。

这是它在所使用的代码上下文中的外观。

class UserKafkaProducerTest {
  @Test
  void testWriteToKafka() throws InterruptedException, JsonProcessingException {
      // Create a user and write to Kafka
      User user = new User("11111", "John", "Wick");
      producer.writeToKafka(user);

      // Read the message (John Wick user) with a test consumer from Kafka and assert its properties
      ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);
      assertNotNull(message);
      assertEquals("11111", message.key());
      User result = objectMapper.readValue(message.value(), User.class);
      assertNotNull(result);
      assertEquals("John", result.getFirstName());
      assertEquals("Wick", result.getLastName());
  }
}
Run Code Online (Sandbox Code Playgroud)

这是本文中的一段代码,它使事情变得清晰。