J.J*_*eam 4 apache-kafka kafka-consumer-api kafka-partition
假设Kafka
, 1 partition
, 2 consumers
.(第二个消费者空闲)
假设第一个服务消耗了一条消息,并与其他 3 个服务一起处理它,然后突然粘在其中一个服务上并错过了 Kafka 的超时。
Kafka 是否会将分区重新指定给第二个消费者,并且消息将被双重处理(假设第一个消费者最终成功)?
如果 Kafka 的消费者处理消息的时间太长怎么办?Kafka会将此分区重新指定给另一个消费者并且消息将被双重处理吗?
对,那是正确的。如果 Kafka 消费者处理消息的时间过长,并且后续的 poll() 被延迟,那么 Kafka 会将该分区重新指定给另一个消费者,并且消息将被再次处理(一次又一次)。
为了更清楚起见,首先我们需要决定并定义“多长才算太长?”。
这是由属性定义的max.poll.interval.ms
。从文档中,
使用消费者组管理时,调用 poll() 之间的最大延迟。这对消费者在获取更多记录之前可以空闲的时间设置了上限。如果在此超时到期之前未调用 poll(),则消费者将被视为失败,并且组将重新平衡,以便将分区重新分配给另一个成员。
如果在此时间内没有调用 poll(),消费者组将重新平衡。
还有一处房产auto.commit.interval.ms
。自动提交偏移量检查只会在轮询期间调用 - 它检查经过的时间是否大于配置的自动提交间隔时间,如果结果为是,则提交偏移量。
如果 Kafka 消费者处理记录的时间过长,则后续的 poll() 调用也会延迟,并且上一次 poll() 返回的偏移量不会提交。如果此时发生重新平衡,分配给该分区的新消费者客户端将再次开始处理消息。
通过增加该值可以避免消费者组重新平衡和由此产生的分区重新分配。这将增加轮询之间允许的间隔,并为消费者提供更多时间来处理从 poll() 返回的记录。消费者只会在轮询调用中加入重新平衡,因此增加最大轮询间隔也会延迟组重新平衡。
将最大轮询间隔增加到一个大值还有一个问题。如果消费者因其他原因死亡,则需要比配置的max.poll.interval.ms
时间间隔更长的时间来检测故障。
session.timeout.ms
在这种情况下,可以heartbeat.interval.ms
尽早检测到总体故障。
有关这些参数的更多详细信息:
请注意,配置的值session.timeout.ms
必须在代理配置中属性配置的允许范围内
否则,启动消费者客户端时将抛出以下异常。
Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)
Run Code Online (Sandbox Code Playgroud)
更新:避免再次处理消息
KafkaConsumer类中有另一个方法commitAsync()
来触发提交偏移操作。
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();
Run Code Online (Sandbox Code Playgroud)
有关 commitSync() 和 commitAsync() 的更多详细信息,请查看此线程
手动提交偏移量是指该偏移量已被处理,这样 Kafka 就不会再次发送同一分区的已提交记录。当手动提交偏移量时,需要注意的是,如果消费者因任何原因在处理记录之前死亡,则这些记录有可能不会被再次处理。
归档时间: |
|
查看次数: |
5658 次 |
最近记录: |