Gee*_*son 6 java apache-kafka spring-boot spring-kafka embedded-kafka
我有一个 Spring Boot 项目,其中有一个 Kafka 侦听器,我想使用嵌入式 Kafka 对其进行测试。我让卡夫卡监听器注销消息“收到记录”。Thread.sleep(1000)
只有当我在方法的开头添加 a 时,才会被注销。
测试类:
@SpringBootTest
@DirtiesContext
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "my-topic" }, ports = 7654)
class KafkaTest {
private static final String TOPIC = "my-topic";
@Autowired
EmbeddedKafkaBroker kafkaBroker;
@Test
void testSendEvent() throws ExecutionException, InterruptedException {
// Thread.sleep(1000); // I wont see the Listener log message unless I add this sleep
Producer<Integer, String> producer = configureProducer();
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(TOPIC, "myMessage");
producer.send(producerRecord).get();
producer.close();
}
private Producer<Integer, String> configureProducer() {
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
}
}
Run Code Online (Sandbox Code Playgroud)
我不想使用变化无常的Thread.sleep()
测试显然是在某些设置过程完成之前执行的。我显然需要等待某件事,但我不确定什么或如何做。
使用:
将一个@EventListener
bean 添加到测试上下文中,并(例如)在收到CountDownLatch
a 时对 a 进行倒计时;ConsumerStartedEvent
然后在测试中
assertThat(eventListner.getLatch().await(10, TimeUnit.SECONDS)).isTrue();
Run Code Online (Sandbox Code Playgroud)
请参阅https://docs.spring.io/spring-kafka/docs/current/reference/html/#events
和
https://docs.spring.io/spring-kafka/docs/current/reference/html/#event-consumation
或者添加一个ConsumerRebalanceListener
并等待分区分配。
我显然需要等待某件事,但我不确定什么或如何做。
您需要使用不同的方法来留出Kafka
时间来处理和路由消息......
看看这一行...
ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);
Run Code Online (Sandbox Code Playgroud)
在测试 Kafka 侦听器时,我们总是指定轮询延迟。这是因为您的消息被发送给 kafka,然后 kafka 将在另一个线程中处理它。你需要等待。
这是它在所使用的代码上下文中的外观。
class UserKafkaProducerTest {
@Test
void testWriteToKafka() throws InterruptedException, JsonProcessingException {
// Create a user and write to Kafka
User user = new User("11111", "John", "Wick");
producer.writeToKafka(user);
// Read the message (John Wick user) with a test consumer from Kafka and assert its properties
ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);
assertNotNull(message);
assertEquals("11111", message.key());
User result = objectMapper.readValue(message.value(), User.class);
assertNotNull(result);
assertEquals("John", result.getFirstName());
assertEquals("Wick", result.getLastName());
}
}
Run Code Online (Sandbox Code Playgroud)
这是本文中的一段代码,它使事情变得清晰。
归档时间: |
|
查看次数: |
6515 次 |
最近记录: |