Spring Kafka 获取分配的分区

Mar*_*rio 5 java apache-kafka kafka-consumer-api spring-kafka

我知道我可以找出来自哪个分区记录,但我想知道有什么方法可以动态获取在特定时刻为消费者分配哪些分区?也许我需要实现一些监听器来检测和跟踪分区分配信息?

我正在使用spring-kafka 1.3.2ConcurrentKafkaListenerContainerFactory@KafkaListener

z0m*_*1ek 3

是的,你可以这样做:

@Bean
public ConsumerAwareRebalanceListener rebalanceListener() {
    return new ConsumerAwareRebalanceListener() { 
        @Override
        public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
           // here partitions
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

然后将其添加到例如 ConcurrentKafkaListenerContainerFactory

@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

    ContainerProperties props = factory.getContainerProperties();
    props.setConsumerRebalanceListener(rebalanceListener());

    return factory;
}
Run Code Online (Sandbox Code Playgroud)