@KafkaListener 与 ConsumerFactory groupId

Vin*_*Vin 3 apache-kafka kafka-consumer-api spring-kafka

我遵循了 baeldung.com 的“ Apache Kafka with Spring 简介”教程。我用以下方法设置了一个KafkaConsumerConfigkafkaConsumerFactory

private ConsumerFactory<String, String> kafkaConsumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    ...
    return new DefaultKafkaConsumerFactory<>(props);
}
Run Code Online (Sandbox Code Playgroud)

和两个“定制”工厂:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
    return kafkaListenerContainerFactory("foo");
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
    return kafkaListenerContainerFactory("bar");
}
Run Code Online (Sandbox Code Playgroud)

MessageListener课堂上,我使用@KafkaListener注释来注册消费者,以groupId监听某个主题:

@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group 'foo': " + message);
    ...
}

@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupBar(String message) {
    System.out.println("Received Message in group 'bar': " + message);
    ...
}
Run Code Online (Sandbox Code Playgroud)

这样就有两组消费者,一组具有groupId“foo”,一组具有groupId“bar”。

fooKafkaListenerContainerFactory现在,如果我以barKafkaListenerContainerFactory这种方式将“foo”消费者的容器工厂从 改为

@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
    ...
}
Run Code Online (Sandbox Code Playgroud)

容器工厂和容器工厂groupId之间似乎不兼容,但没有任何变化。所以,我试图理解的是财产的作用以及为什么它似乎没有被考虑。KafkaListenergroupIdprops.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

Gar*_*ell 6

工厂groupId是默认值,仅groupId当.id@KafkaListener

在早期版本中,只能在工厂上设置 groupId,这意味着如果需要不同的组,则需要为每个侦听器建立一个单独的工厂,这违背了可用于多个侦听器的工厂的想法。

请参阅java文档...

/**
 * Override the {@code group.id} property for the consumer factory with this value
 * for this listener only.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the group id.
 * @since 1.3
 */
String groupId() default "";

/**
 * When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
 * provided) as the {@code group.id} property for the consumer. Set to false, to use
 * the {@code group.id} from the consumer factory.
 * @return false to disable.
 * @since 1.3
 */
boolean idIsGroup() default true;
Run Code Online (Sandbox Code Playgroud)