嗨,我现在正在涉及Spring Kafka并成功地向我的监听器添加了一个KafkaListenerContainerFactory.现在我想添加多个KafkaListenerContainerFactorys(一个用于在json中有消息的主题,另一个用于字符串).见下面的代码:
@EnableKafka
@Configuration
public class KafkaConsumersConfig {
private final KafkaConfiguration kafkaConfiguration;
@Autowired
public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
this.kafkaConfiguration = kafkaConfiguration;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(jsonConsumerFactory());
factory.setConcurrency(3);
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String,Record> jsonConsumerFactory(){
JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
}
@Bean
public Map<String,Object> jsonConsumerConfigs(){
Map<String,Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress());
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
return propsMap;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = …Run Code Online (Sandbox Code Playgroud)