小编Tud*_*dor的帖子

Kafka consumer.poll 不返回任何记录

当我使用新的组 ID 注册消费者时,对 poll 的前 N ​​个调用不返回任何内容。

我想测试一下,当我调用服务时,是否发布了 Kafka 事件。问题是,每当我更改 groupId 时,前 N 次轮询都不返回任何内容。我知道Kafka在轮询时首先注册消费者,但我发现注册消费者所需的轮询次数(时间)太随机了。

消费者配置:

Properties props = new Properties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
Run Code Online (Sandbox Code Playgroud)

脚步:

  1. 在每次测试之前,我调用consumer.poll(Duration.ofSeconds(5))只是为了确保消费者已注册并设置了偏移量。
  2. 我调用服务并对响应进行断言。如果我使用 UI 检查 Kafka,则会发布事件。
  3. 我打电话consumer.poll(Duration.ofSeconds(5)),希望能收到一些记录。这是失败的一步

有没有办法确保第二次投票总是返回记录?我试图让第一次民意调查持续 1 分钟(我已经认为 5 秒对于每次测试来说都太长了),但它有时仍然有效,有时则无效。

谢谢。

java apache-kafka kafka-consumer-api

7
推荐指数
1
解决办法
6944
查看次数

标签 统计

apache-kafka ×1

java ×1

kafka-consumer-api ×1