Java Kafka使用者组无法使用一些消息

tho*_*mas 6 java apache-kafka

注意到一个问题,其中Kafka消费者组(在java中实现)始终错过来自经纪人的一些消息.作为调试的第一线,通过kafka console consumer,我可以在代理中看到这些消息.

Kafka经纪人版:0.10.1.0

Kafka客户端版本:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.9.0.1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

卡夫卡消费者配置:

Properties props = new Properties();
props.put("bootstrap.servers","broker1,broker2,broker3");
props.put("group.id", "myGroupIdForDemo");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("heartbeat.interval.ms", "25000"); 
props.put("session.timeout.ms", "30000"); 
props.put("max.poll.interval.ms", "300000");
props.put("max.poll.records", "1");
props.put("zookeeper.session.timeout.ms", "120000");
props.put("zookeeper.sync.time.ms", "10000");
props.put("auto.commit.enable", "false");
props.put("auto.commit.interval.ms", "60000");
props.put("auto.offset.reset", "earliest");
props.put("consumer.timeout.ms", "-1");
props.put("rebalance.max.retries", "20");
props.put("rebalance.backoff.ms", "6000");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Run Code Online (Sandbox Code Playgroud)

编辑 - 增加一些信息

我想补充一些信息:共有6个分区.但是,具有相同消费者组ID的主题的消费者总数为40.我知道有34位消费者闲着无所事事.

但是,我想要了解的一个方面是,如果消费者未能将代理发送的心跳发送到负载并重新分配分区,那么任何空闲消费者是否都有机会消费消息?此消息未被消耗的问题始终仅在某些分区中被注意到.我的意思是消息无法从同一分区传递/消费.

任何帮助表示赞赏.谢谢.

Vla*_*kov 5

a) 即使在 Kafka 中消息也可能不存在 - 在这种情况下,检查消息大小是否不超过 kafka 代理配置中允许的最大消息大小。

b) 如果您的消费者连接到 Kafka 实例 1 和 2-d 实例未连接,您可能会错过来自 2-d kafka 的消息:因此,请在消费者连接字符串中指定所有代理。

3)如果kafka上存在消息并且您已连接,则可能无法反序列化该消息,因此,尝试另一个反序列化器,可能不是字符串,而是字节数组,看看会发生什么,消息会被消耗吗?如果是,则转换为字符串有问题。

4)消息可能被另一个正在工作的消费者“窃取”,在同一组ID下工作,选择唯一的组ID。

5)您使用什么记录器来查看消耗的消息?您不怀疑这是记录器问题吗?

6)您可能会在消费者消费完所有消息之前杀死/停止消费者吗?

7)可能您消费了,但由于消费者内存限制而失败?可以增加-Xmx。(堆大小)