Tud*_*dor 7 java apache-kafka kafka-consumer-api
当我使用新的组 ID 注册消费者时,对 poll 的前 N 个调用不返回任何内容。
我想测试一下,当我调用服务时,是否发布了 Kafka 事件。问题是,每当我更改 groupId 时,前 N 次轮询都不返回任何内容。我知道Kafka在轮询时首先注册消费者,但我发现注册消费者所需的轮询次数(时间)太随机了。
消费者配置:
Properties props = new Properties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
Run Code Online (Sandbox Code Playgroud)
脚步:
consumer.poll(Duration.ofSeconds(5))
只是为了确保消费者已注册并设置了偏移量。consumer.poll(Duration.ofSeconds(5))
,希望能收到一些记录。这是失败的一步。有没有办法确保第二次投票总是返回记录?我试图让第一次民意调查持续 1 分钟(我已经认为 5 秒对于每次测试来说都太长了),但它有时仍然有效,有时则无效。
谢谢。
Gre*_*i64 11
它不适用于您的“新 groupId”的原因是您处于“最新”模式。
默认值为“latest”,您需要处于“earliest”模式或第一次使用“new groupId”进行轮询,或为此主题的“new groupId”提交偏移量。
您需要将“groupId”注册到主题,而不是消费者。