Spring Kafka-试图了解幕后的工作原理

Ind*_*nde 1 java spring spring-kafka

考虑以下代码 -

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      bootstrapAddress);
    props.put(
      ConsumerConfig.GROUP_ID_CONFIG, 
      groupId);
    props.put(
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class);
    props.put(
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory
      = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

我创建了一个消费者工厂和一个concurrentKafkaListenercontainer Factory.我没有为监听器Factory设置并发性.我有一个用@KafkaListener注释的方法

@KafkaListener(topics = "topicName")
public void listen(String message) {
    System.out.println("Received Message: " + message);
Run Code Online (Sandbox Code Playgroud)

当我不设置并发属性时,Spring会创建1个消费者实例,1个kafka监听器容器属于消费者工厂中指定的组吗?

如果我将并发性更改为3,那么Spring会创建3个消费者实例,因此在配置消费者工厂和3个侦听器容器时,同一个消费者组中有3个消费者吗?

此外,根据并发性,我们假设我们现在只听一个主题,我们将有3个用@kafkalistener注释的方法,如果没有指定分区,则所有3个方法都会监听不同的分区(由kafka以循环方式提供).?

我是卡夫卡的新手,想澄清我的理解.

Dan*_*ani 5

当我不设置并发属性时,Spring会创建1个消费者实例,1个kafka监听器容器属于消费者工厂中指定的组吗?

您将有一个使用者从该主题的所有分区中获取事件.

如果我将并发性更改为3,那么Spring会创建3个消费者实例,因此在配置消费者工厂和3个侦听器容器时,同一个消费者组中有3个消费者吗?

您将拥有3个使用者实例,如果该主题中至少有3个分区,则每个实例都将从其中一个分区中获取事件.消费者将事件传递给该KafkaListener实例.

你可以更具体.

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")
}))
public void listenToParition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Messasge: " + message"
        + "from partition: " + partition);
}
Run Code Online (Sandbox Code Playgroud)

此外,根据并发性,我们假设我们现在只听一个主题,我们将有3个用@kafkalistener注释的方法,如果没有指定分区,则所有3个方法都会监听不同的分区(由kafka以循环方式提供).?

这毫无意义.首先,KafkaListeners是Spring Kafka的高级抽象,Kafka根本没有循环(从消费者的角度来看,它与制作人不同),如果你有3个消费者(同一个消费者群体+正在收听)相同的主题),以及主题中的3个分区,Kafka将重新平衡并为一个消费者分配一个分区,每个消费者将仅从Kafka分配的分区中获取事件.Spring Kafka在收到每个消费者的事件后,将在KafkaListener实例中传递事件.

  • 如果您有 3 个“@KafkaListener”,它们将分别从配置的主题中分配分区。如果他们有相同的主题和不同的“group.id”,他们将获得每条消息的副本。如果它们具有相同的“group.id”,并且正在使用组管理,Kafka 将跨实例分配主题/分区。如果每个主题只有1个分区,则不能保证每个主题都会获得1个分区;最有可能的是,一个实例将从每个主题获取单个分区,而其他 2 个实例将处于空闲状态。 (2认同)