以下代码在receiverOptions和模板之间产生意外的循环依赖:
令人惊讶的是,如果从 spring 上下文中删除 kafkaProps,它会起作用。
看起来某些自动配置正在添加从模板到接收器选项的不必要的依赖项。
请建议配置 ReactiveKafkaConsumerTemplate 的正确方法。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Map<String, Object> kafkaProps() {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092",
ConsumerConfig.GROUP_ID_CONFIG, DemoApplication.class.getSimpleName()
);
}
@Bean
public ReceiverOptions<String, String> receiverOptions(Map<String, Object> kafkaProps) {
return ReceiverOptions.<String, String>create(kafkaProps)
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.subscription(List.of("test-topic"));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> template(ReceiverOptions<String, String> receiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
@Bean
public ApplicationRunner runner(ReactiveKafkaConsumerTemplate<String, String> kafkaReceiver) {
return args -> kafkaReceiver.receive()
.log()
.subscribe();
} …Run Code Online (Sandbox Code Playgroud)