如何使用 spring-kafka 为侦听器传递多个引导服务器

dae*_*n54 5 apache-kafka kafka-consumer-api spring-kafka

我有一个侦听器,需要从多个具有相同主题的 kafka 服务器读取,这些服务器都配置在一个 zookeeper 下。我如何从这些多台服务器中读取数据。你能帮忙解决这个问题吗?

我可以指向zookeeper而不是Kafka服务器吗?

Art*_*lan 7

@KafkaListenerrequiresKafkaListenerContainerFactory @Bean又基于ConsumerFactory. 并且DefaultKafkaConsumerFactory接受Map<String, Object>消费者配置:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
        ...
        return props;
    }
}
Run Code Online (Sandbox Code Playgroud)

https://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/_reference.html#__kafkalistener_annotation

ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG正是标准的 Apache Kafkabootstrap.servers 属性

用于建立与 Kafka 集群的初始连接的主机/端口对列表。客户端将使用所有服务器,而不管此处指定哪些服务器用于引导——此列表仅影响用于发现完整服务器集的初始主机。此列表的格式应为 host1:port1,host2:port2,.... 由于这些服务器仅用于初始连接以发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整集服务器(不过,您可能需要多个服务器,以防服务器宕机)。

不,你不能点Zookeeper地址。Kafka 不再支持这一点。