Sha*_*aka 3 autoscaling apache-kafka spring-boot kafka-consumer-api spring-kafka
我正在使用 spring-kafka 来消费来自两个 Kafka 主题的消息,该主题发送的消息格式如下所示。
@KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
log.info("Message : {} is received", message);
ack.acknowledge();
}
Run Code Online (Sandbox Code Playgroud)
如果您想要水平扩展超出分区数量并动态 - 考虑使用并行消费者(PC) 之类的东西。它可以在 Spring 上下文中使用。
通过使用 PC,您可以并行处理所有密钥,无论处理需要多长时间,并且您可以按照您的意愿并发处理 - 并且可以动态扩展。
PC 通过按键对输入分区进行子分区并并行处理每个键来直接解决此问题。它还跟踪每个记录的确认。查看GitHub 上的 Parallel Consumer(顺便说一句,它是开源的,我是作者)。
您可以设置该concurrency属性来运行更多线程;但每个分区只能由一个线程处理。要增加并发性,您必须增加每个主题中的分区数量。当在同一个监听器中监听多个主题时,如果这些主题只有一个分区,那么除非更改 kafka 消费者分区分配器,否则您可能无法获得所需的并发性。
当监听多个主题时,默认的分区分布可能不是你所期望的。例如,如果您有 3 个主题,每个主题有 5 个分区,并且您想要使用 concurrency=15,则您只会看到 5 个活动使用者,每个使用者从每个主题分配一个分区,而其他 10 个使用者处于空闲状态。这是因为默认的 Kafka PartitionAssignor 是 RangeAssignor(请参阅其 Javadoc)。对于这种情况,您可能需要考虑使用 RoundRobinAssignor,它将分区分配给所有使用者。然后,为每个消费者分配一个主题或分区。...
| 归档时间: |
|
| 查看次数: |
7338 次 |
| 最近记录: |