EmbeddedKafka w/ ContainerTestUtils.waitForAssignment 抛出:预期 1 但获得 0 个分区

dpa*_*mer 13 spring-kafka spring-kafka-test

我们有一个集成测试,我们使用 EmbeddedKafka 并向某个主题生成一条消息,我们的应用程序处理该消息,然后将结果发送到我们使用并断言输出的第二个主题。在 CI 中,这可能在 2/3 的时间内有效,但我们会遇到KafkaTestUtils.getSingleRecord抛出异常的情况java.lang.IllegalStateException: No records found for topic(参见下面的 [1])。

为了尝试解决此问题,我ContainerTestUtils.waitForAssignment为注册表中的每个侦听器容器添加了(请参阅下面的 [2])。在 CI 中成功运行几次后,我看到了一个新的异常:java.lang.IllegalStateException: Expected 1 but got 0 partitions。现在我想知道这是否真的是原始异常“未找到记录”的根本原因。

有什么想法可以帮助解决这里的随机故障吗?我将不胜感激任何有关如何排除故障的建议。

spring-kafka 和 spring-kafka-test v2.6.4。

编辑:添加newConsumer以供参考。

我们的设置示例:

@SpringBootTest
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(
    topics = { "topic1","topic2" },
    partitions = 1,
    brokerProperties = {"listeners=PLAINTEXT://localhost:9099", "port=9099"})
public class IntegrationTest {

  @Autowired
  private EmbeddedKafkaBroker embeddedKafkaBroker;

  @Autowired
  private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

  @Test
  public void testExample() {
    try (Consumer<String, String> consumer = newConsumer()) {
      for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
        [2]
        ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
      }

      try (Producer<String, String> producer = newProducer()) {
        embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "topic2"); // [1]

        producer.send(new ProducerRecord<>(
            "topic1",
            "test payload"));
        producer.flush();
      }

      String result = KafkaTestUtils.getSingleRecord(consumer, "topic2").value();
      assertEquals(result, "expected result");
    }
  }

  private Consumer<String, String> newConsumer() {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupId", "false", embeddedKafkaBroker);
    ConsumerFactory<String, AssetTransferResponse> consumerFactory = new DefaultKafkaConsumerFactory<>(
        consumerProps,
        new StringDeserializer(),
        new CustomDeserializer<>());
    return consumerFactory.createConsumer();
  }
}
Run Code Online (Sandbox Code Playgroud)

mr *_*oob 0

我有一个类似的问题,虽然我的期望是 24 个分区而不是 0。最后,这是因为我的 NewTopic Bean 有一个硬编码的分区数量,如下所示

@Bean
  public NewTopic domainEventsTopic(
      Topics topics,
      @Value("${spring.kafka.admin.replication}") short replicationFactor,) {
    return new NewTopic(topics.domainEvents(), 24, replicationFactor);
  }
Run Code Online (Sandbox Code Playgroud)

这与我的测试不符

ContainerTestUtils.waitForAssignment(container, partitions); // where partions is 1
Run Code Online (Sandbox Code Playgroud)

我的情况的修复方法是将 添加spring.kafka.admin.partitions到我的 application.yaml 中,然后创建一个 application-test.yaml 其中spring.kafka.admin.partitions1,而不是 24。并像这样更新 bean

  @Bean
  public NewTopic domainEventsTopic(
      Topics topics,
      @Value("${spring.kafka.admin.replication}") short replicationFactor,
      @Value("${spring.kafka.admin.partitions}") Integer partitions) {
    return new NewTopic(topics.domainEvents(), partitions, replicationFactor);
  }
Run Code Online (Sandbox Code Playgroud)