如何为@KafkaListener编写单元测试?

Abb*_*ese 1 apache-kafka spring-kafka

试图弄清楚我是否可以使用spring-kafka和spring-kafka-test为@KafkaListener编写单元测试。

我的侦听器类。

public class MyKafkaListener {
@Autowired
private MyMessageProcessor myMessageProcessor;

@KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
public void myMessageListener(MyMessage message) {
    myMessageProcessor.process(message);
    log.info("MyMessage processed");
}}
Run Code Online (Sandbox Code Playgroud)

我的测试班:

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = {"I1.Topic.json.001"})
@ContextConfiguration(classes = {TestKafkaConfig.class})
public class MyMessageConsumersTest {

@Autowired
private MyMessageProcessor myMessageProcessor;

@Value("${kafka.topic.01}")
private String TOPIC_01;

@Autowired
private KafkaTemplate<String, MyMessage> messageProducer;

@Test
public void testSalesforceMessageListner() {
    MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
    messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
    verify(myMessageProcessor, times(1)).process(any(MyMessage.class));
}}
Run Code Online (Sandbox Code Playgroud)

我的测试配置类:

    @Configuration
    @EnableKafka
    public class TestKafkaConfig {
    @Bean
    public MyMessageProcessor myMessageProcessor() {
        return mock(MyMessageProcessor.class);
    }
    @Bean
    public KafkaEmbedded kafkaEmbedded() {
        return new KafkaEmbedded(1, true, 1, "I1.Topic.json.001");
    }

    //Consumer
    @Bean
    public ConsumerFactory<String, MyMessage> myMessageConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyMessage.class));
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MyMessage> myMessageListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(myMessageConsumerFactory());
        return factory;
    }

    //Producer
    @Bean
    public ProducerFactory<String, MyMessage> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
    @Bean
    public KafkaTemplate<String, MyMessage> messageProducer() {
        return new KafkaTemplate<>(producerFactory());
    }
    }
Run Code Online (Sandbox Code Playgroud)

有什么简单的方法可以使这项工作吗?

还是应该以其他方式进行@KafkaListener的测试?在单元测试中,如何确保在Kafka中收到新消息时调用@KafkaListener。

Art*_*lan 8

当新消息到达 Kafka 时,如何确保调用 @KafkaListener。

嗯,这本质上是测试此类功能的框架责任。在你的情况下,你只需要专注于业务逻辑和单元测试你的自定义代码,而不是在框架中编译的代码。此外,还没有测试@KafkaListener仅记录传入消息的方法的重点。找到测试用例验证的钩子肯定太难了。

另一方面,我真的相信您@KafkaListener方法中的业务逻辑比您展示的要复杂得多。因此,最好验证从该方法调用的自定义代码(例如数据库插入、其他一些服务调用等),而不是尝试为myMessageListener().

你用 做什么mock(MyMessageProcessor.class)真的是业务逻辑验证的好方法。只有您的代码中的错误在于EmbeddedKafka: 您使用了注释并且还在@Bean配置中声明了 a 的重复。您应该考虑删除其中之一。虽然不清楚你的生产代码在哪里,它真的不受嵌入式Kafka的影响。否则,如果一切都在测试范围内,我看不出您的消费者和生产者工厂配置有任何问题。您肯定对@KafkaListenerand有一个最小的可能配置KafkaTemplate。您只需要删除一次@EmbeddedKafka不要启动代理两次。


Gar*_*ell 5

您可以将监听器包装在测试用例中。

给定

@SpringBootApplication
public class So52783066Application {

    public static void main(String[] args) {
        SpringApplication.run(So52783066Application.class, args);
    }

    @KafkaListener(id = "so52783066", topics = "so52783066")
    public void listen(String in) {
        System.out.println(in);
    }

}
Run Code Online (Sandbox Code Playgroud)

然后

@RunWith(SpringRunner.class)
@SpringBootTest
public class So52783066ApplicationTests {

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "so52783066");

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private KafkaTemplate<String, String> template;

    @Before
    public void setup() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    }

    @Test
    public void test() throws Exception {
        ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry
                .getListenerContainer("so52783066");
        container.stop();
        @SuppressWarnings("unchecked")
        AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container
                .getContainerProperties().getMessageListener();
        CountDownLatch latch = new CountDownLatch(1);
        container.getContainerProperties()
                .setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, String>() {

                    @Override
                    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment,
                            Consumer<?, ?> consumer) {
                        messageListener.onMessage(data, acknowledgment, consumer);
                        latch.countDown();
                    }

                });
        container.start();
        template.send("so52783066", "foo");
        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
    }

}
Run Code Online (Sandbox Code Playgroud)


Lau*_*ulo 5

这是我根据您的代码为消费者提供的工作解决方案。谢谢 :-)

\n\n

配置如下:

\n\n
@TestConfiguration\n@EnableKafka\n@Profile("kafka_test")\npublic class KafkaTestConfig {\n\n    private static Logger log = LoggerFactory.getLogger(KafkaTestConfig.class);\n\n    @Value("${spring.kafka.bootstrap-servers}")\n    private String bootstrapServers;\n\n    @Bean\n    @Primary\n    public Map<String, Object> consumerConfigs() {\n        Map<String, Object> props = new HashMap<>();\n        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);\n        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);\n        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);\n        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");\n        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);\n        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");\n        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);\n\n        log.info("Consumer TEST config = {}", props);\n        return props;\n    }\n\n    @Bean\n    public Map<String, Object> producerConfigs() {\n        Map<String, Object> props = new HashMap<>();\n        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);\n        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);\n        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);\n\n        log.info("Producer TEST config = {}", props);\n        return props;\n    }\n\n    @Bean\n    public ConsumerFactory<String, String> consumerFactory() {\n        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),\n                new JsonDeserializer<String>());\n    }\n\n    @Bean\n    public ProducerFactory<String, String> producerFactory() {\n        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(producerConfigs());\n        return pf;\n    }\n\n    @Bean\n    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(\n            ConsumerFactory<String, String> kafkaConsumerFactory) {\n        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();\n        factory.setConsumerFactory(consumerFactory());\n        factory.getContainerProperties().setAckOnError(false);\n        factory.setConcurrency(2);\n        return factory;\n    }\n\n    @Bean\n    public KafkaTemplate<String, String> kafkaTemplate() {\n        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());\n        return kafkaTemplate;\n    }\n\n    @Bean\n    public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {\n        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry = new KafkaListenerEndpointRegistry();\n        return kafkaListenerEndpointRegistry;\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

将测试中需要包含的所有 bean 放置在不同的类中:

\n\n
@TestConfiguration\n@Profile("kafka_test")\n@EnableKafka\npublic class KafkaBeansConfig {\n\n    @Bean\n    public MyProducer myProducer() {\n        return new MyProducer();\n    }\n\n    // more beans\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

我创建了一个 BaseKafkaConsumerTest 类来重用它:

\n\n
@ExtendWith(SpringExtension.class)\n@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })\n@TestInstance(Lifecycle.PER_CLASS)\n@DirtiesContext\n@ContextConfiguration(classes = KafkaTestConfig.class)\n@ActiveProfiles("kafka_test")\npublic class BaseKafkaConsumerTest {\n\n    @Autowired\n    protected EmbeddedKafkaBroker embeddedKafka;\n\n    @Value("${spring.embedded.kafka.brokers}")\n    private String brokerAddresses;\n\n    @Autowired\n    protected KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;\n\n    @Autowired\n    protected KafkaTemplate<String, String> senderTemplate;\n\n    public void setUp() {\n        embeddedKafka.brokerProperty("controlled.shutdown.enable", true);\n\n        for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry\n                .getListenerContainers()) {\n            System.err.println(messageListenerContainer.getContainerProperties().toString());\n            ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafka.getPartitionsPerTopic());\n        }\n    }\n\n    @AfterAll\n    public void tearDown() {\n        for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry\n                .getListenerContainers()) {\n            messageListenerContainer.stop();\n        }\n\n        embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());\n        embeddedKafka.getKafkaServers().forEach(b -> b.awaitShutdown());\n    }\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

扩展基类来测试您的消费者:

\n\n
@EmbeddedKafka(topics = MyConsumer.TOPIC_NAME)\n@Import(KafkaBeansConfig.class)\npublic class MYKafkaConsumerTest extends BaseKafkaConsumerTest {\n\n    private static Logger log = LoggerFactory.getLogger(PaymentMethodsKafkaConsumerTest.class);\n\n    @Autowired\n    private MyConsumer myConsumer;\n\n    // mocks with @MockBean\n\n    @Configuration\n    @ComponentScan({ "com.myfirm.kafka" })\n    static class KafkaLocalTestConfig {\n    }\n\n    @BeforeAll\n    public void setUp() {\n        super.setUp();\n    }\n\n    @Test\n    public void testMessageIsReceived() throws Exception {\n\n    //mocks\n\n    String jsonPayload = "{\\"id\\":\\"12345\\","cookieDomain\\":"helloworld"}";\n    ListenableFuture<SendResult<String, String>> future =\n        senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);\n\n    Thread.sleep(10000);\n    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {\n\n        @Override\n        public void onSuccess(SendResult<String, String> result) {\n            log.info("successfully sent message=\'{}\' with offset={}", jsonPayload,\n                result.getRecordMetadata().offset());\n        }\n\n        @Override\n        public void onFailure(Throwable ex) {\n            log.error("unable to send message=\'{}\'", jsonPayload, ex);\n        }\n    });\n\n    Mockito.verify(myService, Mockito.times(1))\n    .update(Mockito.any(MyDetails.class));\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

正如我在其他帖子中读到的那样,不要以这种方式测试业务逻辑。只是拨打电话而已。

\n