Mar*_*rio 5 java apache-kafka kafka-consumer-api spring-kafka
我知道我可以找出来自哪个分区记录,但我想知道有什么方法可以动态获取在特定时刻为消费者分配哪些分区?也许我需要实现一些监听器来检测和跟踪分区分配信息?
我正在使用spring-kafka 1.3.2
和ConcurrentKafkaListenerContainerFactory
。@KafkaListener
是的,你可以这样做:
@Bean
public ConsumerAwareRebalanceListener rebalanceListener() {
return new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// here partitions
}
};
}
Run Code Online (Sandbox Code Playgroud)
然后将其添加到例如 ConcurrentKafkaListenerContainerFactory
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
ContainerProperties props = factory.getContainerProperties();
props.setConsumerRebalanceListener(rebalanceListener());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
4636 次 |
最近记录: |