spring-kafka KafkaListener 中的并行处理和自动缩放

Sha*_*aka 3 autoscaling apache-kafka spring-boot kafka-consumer-api spring-kafka

我正在使用 spring-kafka 来消费来自两个 Kafka 主题的消息,该主题发送的消息格式如下所示。

    @KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
    public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
        log.info("Message : {}  is received", message);
        ack.acknowledge();
    }
Run Code Online (Sandbox Code Playgroud)
  • KafkaListener能否根据自己监听的主题数量来分配消费者线程数量,并并行处理两个主题中的消息?或者它不支持并行处理,消息必须在主题中等待,直到一条消息得到处理?
  • 如果主题中的消息数量较多,我需要自动缩放微服务以启动新实例(直到分区数量)。从 KafkaListener 的角度来看,我可以依靠哪些参数(CPU、内存)来找出主题中的消息数量更高?(即在 API 中,我可以通过监控 HTTP 延迟来自动扩展服务)

Ant*_*bbs 6

如果您想要水平扩展超出分区数量并动态 - 考虑使用并行消费者(PC) 之类的东西。它可以在 Spring 上下文中使用。

通过使用 PC,您可以并行处理所有密钥,无论处理需要多长时间,并且您可以按照您的意愿并发处理 - 并且可以动态扩展。

PC 通过按键对输入分区进行子分区并并行处理每个键来直接解决此问题。它还跟踪每个记录的确认。查看GitHub 上的 Parallel Consumer(顺便说一句,它是开源的,我是作者)。


Gar*_*ell 5

您可以设置该concurrency属性来运行更多线程;但每个分区只能由一个线程处理。要增加并发性,您必须增加每个主题中的分区数量。当在同一个监听器中监听多个主题时,如果这些主题只有一个分区,那么除非更改 kafka 消费者分区分配器,否则您可能无法获得所需的并发性。

请参阅https://docs.spring.io/spring-kafka/docs/2.5.0.RELEASE/reference/html/#using-ConcurrentMessageListenerContainer

当监听多个主题时,默认的分区分布可能不是你所期望的。例如,如果您有 3 个主题,每个主题有 5 个分区,并且您想要使用 concurrency=15,则您只会看到 5 个活动使用者,每个使用者从每个主题分配一个分区,而其他 10 个使用者处于空闲状态。这是因为默认的 Kafka PartitionAssignor 是 RangeAssignor(请参阅其 Javadoc)。对于这种情况,您可能需要考虑使用 RoundRobinAssignor,它将分区分配给所有使用者。然后,为每个消费者分配一个主题或分区。...