Mop*_*per 5 java scala apache-kafka kafka-consumer-api
我有一个设置,其中有几个KafkaConsumers
分别处理单个主题上的多个分区。它们被静态分配了分区,以确保每个消费者都有相同数量的分区来处理。还选择了记录键,以便我们在所有分区上平均分配消息。
在负载很重的时候,我们经常看到少数分区建立了相当大的延迟(数千条消息/几分钟的价值),而其他获得相同负载并被同一消费者消耗的分区设法保持延迟低至几百条消息/几秒。
看起来消费者正在以最快的速度获取记录,绕过大多数分区,但有时会有一个分区被遗漏了很长时间。理想情况下,我希望延迟在各个分区中分布得更均匀。
我已经阅读KafkaConsumer
了一段时间关于轮询行为和配置的文章,到目前为止,我认为有两个选项可以解决这个问题:
KafkaConsumer.pause()
并.resume()
基本上强制KafkaConsumer
从滞后最大的分区读取KafkaConsumer
只订阅一个TopicPartition
,并使用KafkaConsumer
.这些选项似乎都不是处理此问题的正确方法。配置似乎也没有答案:
max.partition.fetch.bytes
仅指定单个分区的最大提取大小,不保证下一次提取将来自另一个分区。max.poll.interval.ms
仅适用于消费者组,而不适用于每个分区。我是否缺少鼓励KafkaConsumer
更频繁地切换分区的方法?或者一种对具有最高滞后的分区实施偏好的方法?
不确定答案是否仍然与您相关,或者我的答案是否完全满足您的需求,但是,您可以尝试使用滞后感知分配器。该分配器将分区分配给消费者,确保为消费者分配分区,以便消费者之间的滞后被统一/平等地分配。这是一个编写良好的代码,我用它实现了基于滞后的分配器。
https://github.com/grantneale/kafka-lag-based-assignor
您所需要做的就是配置您的消费者以使用此分配器。以下声明。
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, LagBasedPartitionAssignor.class.getName());
Run Code Online (Sandbox Code Playgroud)