dae*_*n54 5 apache-kafka kafka-consumer-api spring-kafka
我有一个侦听器,需要从多个具有相同主题的 kafka 服务器读取,这些服务器都配置在一个 zookeeper 下。我如何从这些多台服务器中读取数据。你能帮忙解决这个问题吗?
我可以指向zookeeper而不是Kafka服务器吗?
而@KafkaListener
requiresKafkaListenerContainerFactory
@Bean
又基于ConsumerFactory
. 并且DefaultKafkaConsumerFactory
接受Map<String, Object>
消费者配置:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
...
return props;
}
}
Run Code Online (Sandbox Code Playgroud)
这ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
正是标准的 Apache Kafkabootstrap.servers
属性:
用于建立与 Kafka 集群的初始连接的主机/端口对列表。客户端将使用所有服务器,而不管此处指定哪些服务器用于引导——此列表仅影响用于发现完整服务器集的初始主机。此列表的格式应为 host1:port1,host2:port2,.... 由于这些服务器仅用于初始连接以发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整集服务器(不过,您可能需要多个服务器,以防服务器宕机)。
不,你不能点Zookeeper地址。Kafka 不再支持这一点。
归档时间: |
|
查看次数: |
15141 次 |
最近记录: |