嵌入式 Kafka 集成测试 - 消费者永远无法完成

Dom*_*ika 5 java apache-kafka spring-boot spring-kafka spring-kafka-test

我正在为一个简单的 Spring Boot 应用程序编写 Kafka 集成测试。该应用程序简单地发布到 Kafka 主题。

我正在使用嵌入式 Kafka 实例进行测试。当通过 Intellij 运行时,测试工作得很好,但当我通过 gradle 运行时,测试失败了。看起来latch倒计时永远不会达到 0,测试最终会超时。

生产者配置:

public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrap-address}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> articleProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> articleKafkaTemplate() {
        return new KafkaTemplate<>(articleProducerFactory());
    }
}
Run Code Online (Sandbox Code Playgroud)

制作人:

public class KafkaProducer {

    @Value(value = "kafka.topic-name")
    String topicName;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message, String topic) throws KafkaPublishException {
        try {
            ListenableFuture<SendResult<String, String>> future =
                    kafkaTemplate.send(topic, message);
           future.get();
        } catch (Exception e) {
            throw new KafkaPublishException(e.getMessage());
        }

    }

    public String getTopicName() {
        return topicName;
    }
Run Code Online (Sandbox Code Playgroud)

消费者:

@Component
public class KafkaConsumerHelper {
    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {

        setPayload(consumerRecord.toString());
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }

    private void setPayload(String payload) {
        this.payload = payload;
    }
}
Run Code Online (Sandbox Code Playgroud)

测试:

@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaProducerTest {

    @Autowired
    private KafkaProducer producer;

    @Autowired
    private KafkaConsumerHelper consumer;

    @Value("${test.topic}")
    private String topic;


    @Test
    public void shouldSuccessfullyPublishAnArticleMessageToEmbeddedKafka()
            throws Exception {

        String message = createArticle();

        producer.sendMessage(message, topic);
        consumer.getLatch().await();

        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString(message));
    }
Run Code Online (Sandbox Code Playgroud)

应用程序.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: earliest
      group-id: my-id
test:
  topic: embedded-test-topic
  partitions-number: 1
  replication-factor: 1
Run Code Online (Sandbox Code Playgroud)

知道是什么问题吗?

Dom*_*ika 8

对于将来看到这个问题的任何人来说,我的问题是我没有@EmbeddedKafka正确使用。

修复方法是将bootstrapServersProperty = "spring.kafka.bootstrap-servers"属性添加到@EmbeddedKafka注释中。

@EmbeddedKafka(partitions = 1, bootstrapServersProperty = "spring.kafka.bootstrap-servers")
Run Code Online (Sandbox Code Playgroud)

更多信息请参见Kafka 文档

  • 文档网址已损坏,`bootstrapServersProperty = "spring.kafka.bootstrap-servers"` 是什么意思? (2认同)