nop*_*ope 3 unit-testing spring-kafka
问题
我正在尝试对发送整数的卡夫卡生产者进行单元测试。运行单元测试时,我可以看到输出表明我的生产者和消费者在控制台中正常工作。也就是说,生产者发送零值,消费者接收零值,并将该整数与运行总计相加。
sending data = 0
received content = 0
sending total = 0
Run Code Online (Sandbox Code Playgroud)
然而,当测试 ConsumerRecord 返回空条目时,单元测试最终失败。我猜监听器根本就没有工作。
java.lang.AssertionError:
Expected: a ConsumerRecord with value 0
but: is null
Run Code Online (Sandbox Code Playgroud)
问题
我定义容器/消息监听器的方式是否有错误?或者我的单元测试有更根本的错误?
这个url包含我的生产者/配置和消费者/配置的代码。在那里查看它可能比在这里让代码部分变得更大更容易。
https://github.com/ewingian/RestCalculator/tree/master/src/main/java/com/calculator/kafka
单元测试
package com.calculator;
/**
* Created by ian on 2/9/18.
*/
import com.calculator.kafka.services.KafkaProducer;
import com.calculator.kafka.services.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.*;
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class KafkaTest {
// in case I need to send some integers
private Integer i1 = 0;
private Integer i2 = 3;
private static final String SENDER_TOPIC = "input";
private List<Integer> l1;
@Autowired
private KafkaProducer producer;
@Autowired
private KafkaConsumer consumer;
private KafkaMessageListenerContainer<String, Integer> container;
private BlockingQueue<ConsumerRecord<String, Integer>> records;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Before
public void testTemplate() throws Exception {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
// set the topic that needs to be consumed
ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);
// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
// create a thread safe queue to store the received message
records = new LinkedBlockingQueue<>();
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, Integer>() {
@Override
public void onMessage(ConsumerRecord<String, Integer> record) {
LOGGER.debug("test-listener received message='{}'", record.toString());
records.add(record);
}
});
// start the container and underlying message listener
container.start();
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
}
@After
public void tearDown() {
// stop the container
container.stop();
}
@Test
public void testSend() throws InterruptedException {
// send the message
producer.send(i1);
// check that the message was received
ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
// Hamcrest Matchers to check the value
assertThat(received, hasValue(i1));
// AssertJ Condition to check the key
assertThat(received, hasKey(null));
}
}
Run Code Online (Sandbox Code Playgroud)
您需要使用正确的键/值序列化器/反序列化器。KTU分别为Integer/String设置,需要String/Integer。
您的测试用例的修改版本有效......
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class KafkaTest {
// in case I need to send some integers
private final Integer i1 = 0;
private final Integer i2 = 3;
private static final String SENDER_TOPIC = "input";
private List<Integer> l1;
private KafkaMessageListenerContainer<String, Integer> container;
private BlockingQueue<ConsumerRecord<String, Integer>> records;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Before
public void testTemplate() throws Exception {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerProperties);
// set the topic that needs to be consumed
ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);
// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
// create a thread safe queue to store the received message
records = new LinkedBlockingQueue<>();
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, Integer>() {
@Override
public void onMessage(ConsumerRecord<String, Integer> record) {
LOGGER.debug("test-listener received message='{}'", record.toString());
records.add(record);
}
});
// start the container and underlying message listener
container.start();
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
}
@After
public void tearDown() {
// stop the container
container.stop();
}
@Test
public void testSend() throws InterruptedException {
// send the message
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
ProducerFactory<String, Integer> pf = new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<String, Integer> template = new KafkaTemplate<>(pf);
template.send(SENDER_TOPIC, i1);
// check that the message was received
ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
// Hamcrest Matchers to check the value
assertThat(received, hasValue(i1));
// AssertJ Condition to check the key
assertThat(received, hasKey(null));
System.out.println(received);
}
}
Run Code Online (Sandbox Code Playgroud)
编辑
回应您在下面的评论...
您的生产者未连接到嵌入式(测试)代理。这对我来说效果很好:
@Configuration
public class KafkaProducerConfig {
@Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + ":localhost:9092}")
private String bootstrapServer;
@Bean
public ProducerFactory<String, Integer> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Integer> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Run Code Online (Sandbox Code Playgroud)
和
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class KafkaTest {
// in case I need to send some integers
private final Integer i1 = 42;
private final Integer i2 = 3;
private static final String SENDER_TOPIC = "input";
private List<Integer> l1;
@Autowired
private KafkaProducer producer;
private KafkaMessageListenerContainer<String, Integer> container;
private BlockingQueue<ConsumerRecord<String, Integer>> records;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Before
public void testTemplate() throws Exception {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerProperties);
// set the topic that needs to be consumed
ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);
// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
// create a thread safe queue to store the received message
records = new LinkedBlockingQueue<>();
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, Integer>() {
@Override
public void onMessage(ConsumerRecord<String, Integer> record) {
LOGGER.debug("test-listener received message='{}'", record.toString());
records.add(record);
}
});
// start the container and underlying message listener
container.start();
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
}
@After
public void tearDown() {
// stop the container
container.stop();
}
@Test
public void testSend() throws InterruptedException {
// send the message
producer.send(i1);
// check that the message was received
ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
// Hamcrest Matchers to check the value
assertThat(received, hasValue(i1));
// AssertJ Condition to check the key
assertThat(received, hasKey(null));
System.out.println(received);
}
}
Run Code Online (Sandbox Code Playgroud)
和
ConsumerRecord(topic = input, partition = 1, offset = 0, CreateTime = 1519487134283, checksum = 866641474, serialized key size = -1, serialized value size = 4, key = null, value = 42)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
8822 次 |
最近记录: |