嗨,我目前正在使用Docker设置Kafka.我已设法使用已发布的汇合图像设置Zookeeper和Kafka,请参阅以下docker-compose文件:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:3.2.0
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
restart: always
kafka:
image: confluentinc/cp-kafka:3.2.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- '9092:9092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.99.100:9092
LISTENERS: PLAINTEXT://0.0.0.0:9092
restart: always
kafka-rest:
image: confluentinc/cp-kafka-rest:3.2.0
container_name: kafka-rest
depends_on:
- kafka
ports:
- '8082:8082'
environment:
KAFKA_REST_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_REST_LISTENERS: http://kafka-rest:8082
KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_REST_HOST_NAME: kafka-rest
restart: always
schema-registry:
image: confluentinc/cp-schema-registry:3.2.0
container_name: schema-registry
depends_on:
- kafka
ports:
- '8081'
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: …Run Code Online (Sandbox Code Playgroud) 嗨,我现在正在涉及Spring Kafka并成功地向我的监听器添加了一个KafkaListenerContainerFactory.现在我想添加多个KafkaListenerContainerFactorys(一个用于在json中有消息的主题,另一个用于字符串).见下面的代码:
@EnableKafka
@Configuration
public class KafkaConsumersConfig {
private final KafkaConfiguration kafkaConfiguration;
@Autowired
public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
this.kafkaConfiguration = kafkaConfiguration;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(jsonConsumerFactory());
factory.setConcurrency(3);
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String,Record> jsonConsumerFactory(){
JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
}
@Bean
public Map<String,Object> jsonConsumerConfigs(){
Map<String,Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress());
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
return propsMap;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = …Run Code Online (Sandbox Code Playgroud)