小编Oma*_*uez的帖子

卡夫卡设置与docker-compose

嗨,我目前正在使用Docker设置Kafka.我已设法使用已发布的汇合图像设置Zookeeper和Kafka,请参阅以下docker-compose文件:

version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:3.2.0
    container_name: zookeeper
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    restart: always

  kafka:
    image: confluentinc/cp-kafka:3.2.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.99.100:9092
      LISTENERS: PLAINTEXT://0.0.0.0:9092
    restart: always

  kafka-rest:
   image: confluentinc/cp-kafka-rest:3.2.0
   container_name: kafka-rest
   depends_on:
     - kafka
   ports:
     - '8082:8082'
   environment:
     KAFKA_REST_ZOOKEEPER_CONNECT: 'zookeeper:2181'
     KAFKA_REST_LISTENERS: http://kafka-rest:8082
     KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry:8081
     KAFKA_REST_HOST_NAME: kafka-rest
   restart: always

 schema-registry:
   image: confluentinc/cp-schema-registry:3.2.0
   container_name: schema-registry
   depends_on:
     - kafka
   ports:
     - '8081'
   environment:
     SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: …
Run Code Online (Sandbox Code Playgroud)

apache-kafka docker docker-compose confluent

14
推荐指数
2
解决办法
1万
查看次数

添加多个KafkaListenerContainerFactories时出现问题

嗨,我现在正在涉及Spring Kafka并成功地向我的监听器添加了一个KafkaListenerContainerFactory.现在我想添加多个KafkaListenerContainerFactorys(一个用于在json中有消息的主题,另一个用于字符串).见下面的代码:

@EnableKafka
@Configuration
public class KafkaConsumersConfig {

    private final KafkaConfiguration kafkaConfiguration;

    @Autowired
    public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
        this.kafkaConfiguration = kafkaConfiguration;
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jsonConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,Record> jsonConsumerFactory(){
        JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
        return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public Map<String,Object> jsonConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
    @Bean
    public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka spring-boot spring-kafka

6
推荐指数
2
解决办法
6575
查看次数