寻找在 Spring Kafka 2.1.0 中使用自定义 ConsumerAwareRebalanceListener 的工作示例

Lam*_*bda 5 spring-kafka

根据文档:

ContainerProperties 有一个属性 consumerRebalanceListener,它采用了 Kafka 客户端的 ConsumerRebalanceListener 接口的实现。如果未提供此属性,容器将配置一个简单的日志侦听器,在 INFO 级别下记录重新平衡事件。该框架还添加了一个子接口 ConsumerAwareRebalanceListener [...]

然而,似乎没有注入点或生命周期阶段来实际将自定义重新平衡侦听器分配给ContainerProperties.

就我而言,我使用的是自动配置,bean 主要由KafkaAnnotationDrivenConfiguration和提供KafkaAutoConfiguration

与许多 Spring 类和组件不同,无法通过定义某种类型的 bean 来设置自定义重新平衡侦听器,唯一的选择似乎是对现有 bean 进行黑客攻击和子类化。

或者我在这里遗漏了什么?

Art*_*lan 0

ConcurrentKafkaListenerContainerFactoryConfigurer你需要扩展、调用super.configure()和使用的:

ContainerProperties containerProperties = listenerContainerFactory
            .getContainerProperties();
Run Code Online (Sandbox Code Playgroud)

要访问@KafkaListener. 并且已经在那里你可以注入你的RebalanceListener.

正确的是:该习惯ConcurrentKafkaListenerContainerFactoryConfigurer必须声明为@Bean

@Bean
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer(
        ConsumerAwareRebalanceListener rebalanceListener) {

    return new ConcurrentKafkaListenerContainerFactoryConfigurer() {

        @Override
        public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
                ConsumerFactory<Object, Object> consumerFactory) {
            super.configure(listenerContainerFactory, consumerFactory);
            listenerContainerFactory.getContainerProperties()
                    .setConsumerRebalanceListener(rebalanceListener);
        }

    };

}
Run Code Online (Sandbox Code Playgroud)

确切地说,这个 bean 是从KafkaAnnotationDrivenConfiguration构建kafkaListenerContainerFactorybean 中使用的:

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {
Run Code Online (Sandbox Code Playgroud)