Spring Kafka Embedded - 测试之间已经存在主题

Son*_*Son 2 java junit5 spring-kafka

我创建了一组带有嵌入式 kafka (spring-kafka-test) 的测试 (JUnit 5),当我有时(并非总是)运行它们时,我在单次运行的一个或多个测试中得到“主题 'some_name' 已存在”。

所有测试都使用相同的主题名称(我不想为每个测试更改该名称),测试类具有 DirtiesContext 注释(AFTER_EACH_TEST_METHOD)。我不确定这个问题的原因是什么,以及如何解决它。

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@ActiveProfiles("test")
public class RemovalKafkaTestIT {
    private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
    private final static String SERVER_ADDRES = "127.0.0.1:9092";

    private Consumer<String, String> prepareConsumer() {
        Map<String, Object> configsConsumer = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        configsConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configsConsumer.put("bootstrap.servers", SERVER_ADDRES);
        Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configsConsumer, new StringDeserializer(), new StringDeserializer()).createConsumer();
        consumer.subscribe(singleton("some_name"));
        return consumer;
    }

    @Test
    public void someMethodWithKafka1() {
        // some logic
        ...
        // check topic content 
        Consumer<String, String> consumer = this.prepareConsumer();
        embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");

        ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
        assertThat(records.count()).isEqualTo(1); // and other checks :)

        // clean
        consumer.commitSync();
        consumer.close();
    }

    @Test
    public void someMethodWithKafka2() {
        // some other logic
        ...
        // check topic content 
        Consumer<String, String> consumer = this.prepareConsumer();
        embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");

        ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
        assertThat(records.count()).isEqualTo(1); // and other checks :)

        // clean
        consumer.commitSync();
        consumer.close();
    }
}
Run Code Online (Sandbox Code Playgroud)

Gar*_*ell 6

你有两个经纪人;您自己创建的一个:

private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
Run Code Online (Sandbox Code Playgroud)

以及一个由 Spring 管理的:

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
Run Code Online (Sandbox Code Playgroud)

当您@EmbeddedKafka与 Spring 测试上下文一起使用时;代理被添加到上下文中。

将其更改为

@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
Run Code Online (Sandbox Code Playgroud)

并且不要添加其他豆子。

一般来说,为每个测试使用不同的主题会更容易(也更快);避免为每个测试创建一个代理。

编辑

ports = 9092

使用随机端口代替(省略此配置)并使用

configsConsumer.put("bootstrap.servers", this.embeddedKafkaBroker.getBrokersAsString());
Run Code Online (Sandbox Code Playgroud)