Sma*_*hie 3 apache-kafka spring-boot spring-kafka spring-kafka-test
我创建了一个单元测试来测试 Kafka 监听器,如下所示。
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092","port=909"})
class ConsumerTest {
@Autowired
KafkaTemplate producer;
@Test
public void consumeEvents1Test() throws InterruptedException {
producer.send("events1", "Sample message");
Thread.sleep(1000);
}
}
Run Code Online (Sandbox Code Playgroud)
Consumer 的创建如下所示。
@Component
public class Consumer {
Logger LOG = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(id= "${topic1}" ,
topics = "${topic1}",
groupId = "${consumer.group1}", concurrency = "1", containerFactory = "kafkaListenerContainerFactory")
public void consumeEvents1(String message, @Headers Map<String, String> header, Acknowledgment acknowledgment) {
LOG.info("Message - {}", message);
LOG.info(header.get(KafkaHeaders.GROUP_ID) + header.get(KafkaHeaders.RECEIVED_TOPIC)+String.valueOf(header.get(KafkaHeaders.OFFSET)));
acknowledgment.acknowledge();
}
}
Run Code Online (Sandbox Code Playgroud)
消费者工厂和容器工厂的创建如下所示。
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootStrapServers);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setAutoStartup(autoStart);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
POM 的依赖关系是:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)
但是,当我调用测试用例时,消息被发布到嵌入式 Kafka,但实际的侦听器并未被调用。不确定测试设置有什么问题。
下面给出了 application.properties,
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest
consumer.group1=events-group1
topic1=events1
kafka.listener.autostart=true
Run Code Online (Sandbox Code Playgroud)
你正在创建你自己的ConsumerFactory所以
spring.kafka.consumer.auto-offset-reset=earliest
Run Code Online (Sandbox Code Playgroud)
没有被应用。该记录是在消费者开始之前发布的,因此存在竞争条件。
你需要
spring.kafka.consumer.auto-offset-reset=earliest
Run Code Online (Sandbox Code Playgroud)
或者您应该使用 Boot 的自动配置消费者工厂。
| 归档时间: |
|
| 查看次数: |
5418 次 |
| 最近记录: |