SpringBoot 与 REACTOR kafka :增加 2CPUs pod 上的消息消耗吞吐量

Pat*_*Pat 1 java apache-kafka spring-boot spring-kafka reactor-kafka

关于带有 Reactor kafka 的 SpringBoot 3 应用程序的小问题。

我有一个小型反应式 kafka 消费者应用程序,它消耗来自 kafka 的消息并处理该消息。

该应用程序正在使用一个the-topic具有三个分区的主题。

该应用程序是docker化的,并且由于资源消耗限制的原因,该应用程序只能使用2个CPU(请耐心等待)。为了让事情变得更加困难,我只能运行该应用程序的一个唯一实例

该应用程序非常简单:

     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.kafka</groupId>
            <artifactId>reactor-kafka</artifactId>
        </dependency>
    </dependencies>
Run Code Online (Sandbox Code Playgroud)
@Configuration
public class MyKafkaConfiguration {

    @Bean
    public KafkaReceiver<String, String> reactiveKafkaConsumerTemplate(KafkaProperties kafkaProperties) {
        kafkaProperties.setBootstrapServers(List.of("my-kafka.com:9092"));
        kafkaProperties.getConsumer().setGroupId("should-i-do-something-here");
        final ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        basicReceiverOptions.subscription(Collections.singletonList("the-topic"));
        return new DefaultKafkaReceiver<>(ConsumerFactory.INSTANCE, basicReceiverOptions);
    }

}
Run Code Online (Sandbox Code Playgroud)
@Service
public class MyConsumer implements CommandLineRunner {

    @Autowired
    private KafkaReceiver<String, String> kafkaReceiver;


    @Override
    public void run(String... args) {
        myConsumer().subscribe();
    }

    public Flux<String> myConsumer() {
        return kafkaReceiver.receive()
                .flatMap(oneMessage -> consume(oneMessage))
                .doOnNext(abc -> System.out.println("successfully consumed {}={}" + abc))
                .doOnError(throwable -> System.out.println("something bad happened while consuming : {}" + throwable.getMessage()));
    }

    private Mono<String> consume(ConsumerRecord<String, String> oneMessage) {
        // this first line is a heavy in memory computation which transforms the incoming message to a data to be saved.
        // it is very intensive computation, but has been tested NON BLOCKING by different tools, and takes 1 second :D
        String transformedStringCPUIntensiveNonButNonBLocking = transformDataNonBlockingWithIntensiveOperation(oneMessage);
        //then, just saved the correct transformed data into any REACTIVE repository :)
        return myReactiveRepository.save(transformedStringCPUIntensiveNonButNonBLocking);
    }

}

Run Code Online (Sandbox Code Playgroud)

如果我正确理解了项目反应堆,并且由于我的资源限制,我最多将拥有 2 个反应堆核心。

这里的consum方法已经测试为非阻塞,但是处理消息需要一秒钟的时间。

那么,我每秒只能消费2条消息吗?(希望不是)

这些消息可以按任何顺序使用,我希望通过这个单一应用程序最大化吞吐量。

请问如何在这些限制下最大限度地提高该应用程序的并行度/吞吐量?

谢谢

Ale*_*lex 5

我们可以应用利特尔定律来计算处理所需吞吐量所需的并发性。

workers >= throughput x latency,在我们的例子中workers是并行处理的许多消息

例如,要以 60 秒延迟每秒处理 100 条消息,我们需要同时处理 100 x 60 = 6000 个消息。在“传统”阻塞应用程序中,我们需要相同数量的线程。在反应式应用程序中,相同的工作负载只能由多个线程处理,因此占用的内存要少得多。即使处理一条消息需要 30-60 秒,线程也不会被阻塞,因为所有 IO 操作都是异步的。要扩展处理,您需要减少延迟或增加并发性。

在我们的例子中,我们需要并行处理 6000 个。通过 3 个分区,您可以让 3 个消费者分别并行处理 2000 条消息。

默认情况下,flatMap并行处理Queues.SMALL_BUFFER_SIZE = 256消息,但您可以对其进行配置。

kafkaReceiver.receive()
    .flatMap(oneMessage -> consume(oneMessage), concurrency)
Run Code Online (Sandbox Code Playgroud)

很难说一个应用程序可以处理多少条消息,您需要运行负载测试来了解最大吞吐量。尝试最大化此数字以了解您在指标方面的限制。如果应用程序无法处理此类负载,您将需要增加分区数量并部署更多消费者。

最终,您的目标是处理比您生成的消息更多的消息。如果生产者每秒发送 2 条消息,则不需要高并发性 (2 * 60 = 120)。

还有其他变量需要考虑 - 消息大小、下游系统的吞吐量、其他组件的限制。例如,WebClient/Netty 默认限制为 500 个并发连接。有时您甚至需要“减慢”消费者的速度,以免下游服务超载。