Kafka 消费者分配返回空集

roo*_*kie 4 java apache-kafka kafka-consumer-api

我订阅了一个 Kafka 主题,如下所示。只有在为消费者分配了分区后,我才需要运行一些逻辑。

然而,consumer.assignment()无论我等多久,它都会作为一个空集返回。如果我没有 while 循环,然后执行 a consumer.poll()我确实从主题中获取记录。谁能告诉我为什么会发生这种情况?

    consumer.subscribe(topics);
    Set<TopicPartition> assigned=Collections.emptySet();
    while(isAssigned) {
          assigned = consumer.assignment();
          if(!assigned.isEmpty()) {
              isAssigned= false;
          }
    }

//consumer props
Properties props = new Properties();
        props.put("bootstrap.servers", "xxx:9092,yyy:9092");
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://xxx:8081");
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("max.poll.records", "100");
Run Code Online (Sandbox Code Playgroud)

Mic*_*son 6

在您调用 之前poll(),消费者只是闲置。

只有在poll()被调用后,它才会启动与集群的连接,获取分配的分区并尝试获取消息。

这是在Javadoc 中提到的:

订阅一组主题后,消费者将在调用 poll(Duration) 时自动加入该组。

一旦您开始调用poll(),您就可以使用assignment()甚至注册 aConsumerRebalanceListener以在从您的消费者实例分配或撤销分区时进行通知。

  • 以防万一这对其他人有帮助:您可能会在不调用 poll 的情况下随机被分配,但这种情况很少见。如果您不知道需要民意调查,这可能会非常令人困惑。 (3认同)