Reactor Flux flatMap 算子吞吐量/并发控制并实现背压

Nav*_*mar 1 spring-boot project-reactor reactive-streams spring-webflux spring-webclient

我正在使用 Flux 来构建我的反应式管道。在管道中,我需要调用 3 个不同的外部系统 REST API,它们的访问速率非常严格。如果我违反了每秒速率阈值,我将受到指数级的限制。每个系统都有自己的阈值。

我正在使用 Spring WebClient 进行 REST API 调用;在 3 个 API 中,其中 2 个是 GET,1 个是 POST。

在我的反应器管道中,WebClient 被包裹在 flatMap 中以执行 API 调用,如下面的代码:

WebClient getApiCall1 = WebClient.builder().build().get("api-system-1").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall2 = WebClient.builder().build().get("api-system-2").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall3 = WebClient.builder().build().get("api-system-3").retrieve().bodyToMono(String.class) //actual return DTO is different from string

    Flux.generator(generator) // Generator pushes the elements from source 1 at a time

    // make call to 1st API Service
    .flatMap(data -> getApiCall1)
    .map(api1Response -> api1ResponseModified)

    // make call to 2nd API Service
    .flatMap(api1ResponseModified -> getApiCall2)
    .map(api2Response -> api2ResponseModified)

// make call to 3rd API Service
.flatMap(api2ResponseModified -> getApiCall3)
.map(api3Response -> api3ResponseModified)

// rest of the pipeline operators

//end
.subscriber();
Run Code Online (Sandbox Code Playgroud)

问题是,如果我不将concurrency值设置为 flatMap,那么我的管道执行将在服务启动的几秒钟内突破阈值。如果我将 的值设置为 1、2、5、10 concurrency,那么吞吐量会变得非常低。

问题是,如果没有为并发设置任何值,我怎样才能实现应该遵守外部系统速率限制的背压?

Mic*_*rry 6

鉴于您有“每秒速率”要求,我会明确地对通量进行窗口化并将每个窗口限制在选定的时间段内。这将为您提供最大吞吐量而不会受到限制。

我会使用类似于以下内容的辅助函数:

public static <T> Flux<T> limitIntervalRate(Flux<T> flux, int ratePerInterval, Duration interval) {
    return flux
            .window(ratePerInterval)
            .zipWith(Flux.interval(Duration.ZERO, interval))
            .flatMap(Tuple2::getT1);
}
Run Code Online (Sandbox Code Playgroud)

这允许您执行以下操作:

sourceFlux
        .transform(f -> limitIntervalRate(f, 2, Duration.ofSeconds(1))) //Limit to a rate of 2 per second
Run Code Online (Sandbox Code Playgroud)

然后,您可以根据需要将其映射到您的WebClient调用上,同时尊重每个 API 的适当限制:

sourceFlux
        //...assume API 1 has a limit of 10 calls per second
        .transform(f -> limitIntervalRate(f, 10, Duration.ofSeconds(1)))
        .flatMap(data -> getApiCall1)
        .map(api1Response -> api1ResponseModified)

        //...assume API 2 has a limit of 20 calls per second
        .transform(f -> limitIntervalRate(f, 20, Duration.ofSeconds(1))) 
        .flatMap(api1ResponseModified -> getApiCall2)
        .map(api2Response -> api2ResponseModified)
Run Code Online (Sandbox Code Playgroud)

...等等。