带弹簧启动的简单嵌入式Kafka测试示例

Yun*_*ska 22 apache-kafka spring-boot spring-kafka

编辑FYI:工作gitHub示例


我在互联网上搜索,找不到嵌入式Kafka测试的工作简单示例.

我的设置是:

  • 春天的靴子
  • 多个@KafkaListener,在一个类中包含不同的主题
  • 嵌入式Kafka进行测试,开始很好
  • 使用Kafkatemplate进行测试,发送到主题,但 @KafkaListener方法即使在很长的睡眠时间后也没有收到任何内容
  • 没有显示警告或错误,只有日志中来自Kafka的垃圾邮件

请帮我.主要是过度配置或过度设计的示例.我相信它可以做得很简单.多谢你们!

@Controller
public class KafkaController {

    private static final Logger LOG = getLogger(KafkaController.class);

    @KafkaListener(topics = "test.kafka.topic")
    public void receiveDunningHead(final String payload) {
        LOG.debug("Receiving event with payload [{}]", payload);
        //I will do database stuff here which i could check in db for testing
    }
}
Run Code Online (Sandbox Code Playgroud)

private static String SENDER_TOPIC ="test.kafka.topic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

@Test
    public void testSend() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
        Thread.sleep(10000);
    }
Run Code Online (Sandbox Code Playgroud)

May*_*yur 24

嵌入式Kafka测试适用于以下配置,

测试类的注释

@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
    partitions = 1, 
    controlledShutdown = false,
    brokerProperties = {
        "listeners=PLAINTEXT://localhost:3333", 
        "port=3333"
})
public class KafkaConsumerTest {
    @Autowired
    KafkaEmbedded kafkaEmbeded;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
Run Code Online (Sandbox Code Playgroud)

在设置方法的注释之前

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  }
}
Run Code Online (Sandbox Code Playgroud)

注意:我不是@ClassRule用来创建嵌入式Kafka而是自动布线
@Autowired embeddedKafka

@Test
public void testReceive() throws Exception {
     kafkaTemplate.send(topic, data);
}
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助!

编辑:测试配置类标记为 @TestConfiguration

@TestConfiguration
public class TestConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);
    return kafkaTemplate;
}
Run Code Online (Sandbox Code Playgroud)

现在@Test方法将自动装配KafkaTemplate并使用is发送消息

kafkaTemplate.send(topic, data);
Run Code Online (Sandbox Code Playgroud)

使用上面一行更新了答案代码块

  • 最好将TestConfig声明为KafkaConsumerTest中的内部类。在这种情况下:a)必须为静态b)必须将“ KafkaEmbedded”作为方法“ producerFactory”的参数注入c)将“ ProducerFactory”作为方法“ kafkaTemplate”的参数注入,然后使用它代替调用`producerFactory()`。 (2认同)
  • “setup()”方法和“ContainerTestUtils.waitForAssignment(..)”对我们来说是黄金。我们遇到消费者在另一个测试类之后挂起,这导致下一个测试的消费者没有收到任何东西。我们还使用`@DirtiesContext(AFTER_CLASS)` (2认同)

Laz*_*azR 12

因为接受的答案对我不起作用。我找到了另一个基于https://blog.mimacom.com/testing-apache-kafka-with-spring-boot/ 的解决方案,我想与您分享。

依赖项是“spring-kafka-test”版本:“2.2.7.RELEASE”

@RunWith(SpringRunner.class)
@EmbeddedKafka(partitions = 1, topics = { "testTopic" })
@SpringBootTest
public class SimpleKafkaTest {

    private static final String TEST_TOPIC = "testTopic";

    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    public void testReceivingKafkaEvents() {
        Consumer<Integer, String> consumer = configureConsumer();
        Producer<Integer, String> producer = configureProducer();

        producer.send(new ProducerRecord<>(TEST_TOPIC, 123, "my-test-value"));

        ConsumerRecord<Integer, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
        assertThat(singleRecord).isNotNull();
        assertThat(singleRecord.key()).isEqualTo(123);
        assertThat(singleRecord.value()).isEqualTo("my-test-value");

        consumer.close();
        producer.close();
    }

    private Consumer<Integer, String> configureConsumer() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer<Integer, String> consumer = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps)
                .createConsumer();
        consumer.subscribe(Collections.singleton(TEST_TOPIC));
        return consumer;
    }

    private Producer<Integer, String> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 你正在测试嵌入式kafka本身吗? (3认同)

Yun*_*ska 6

我现在解决了这个问题

@BeforeClass
public static void setUpBeforeClass() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}
Run Code Online (Sandbox Code Playgroud)

在调试时,我看到嵌入式kaka服务器正在使用随机端口。

我找不到它的配置,所以我将kafka配置设置为与服务器相同。对我来说看起来还是有点难看。

我希望只提到@Mayur行

@EmbeddedKafka(partitions = 1, controlledShutdown = false, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
Run Code Online (Sandbox Code Playgroud)

但无法在互联网上找到正确的依赖关系。

  • 您可以在application.properties中为测试设置spring.kafka.bootstrap-servers = $ {spring.embedded.kafka.brokers},这应该可以工作。这是从EmbeddedKafka用启动时分配的随机端口填充的。 (2认同)