Webflux 并行连接数以某种方式限制为 256

gua*_*ito 4 netty spring-boot spring-webflux

我有一个简单的服务器和客户端设置:

Flux.range(1, 5000)
        .subscribeOn(Schedulers.parallel())
        .flatMap(i -> WebClient.create()
            .method(HttpMethod.POST)
            .uri("http://localhost:8080/test")
            .body(Mono.just(String.valueOf(i)), String.class)
            .exchange())
        .publishOn(Schedulers.parallel())
        .subscribe(response ->
            response.bodyToMono(String.class)
                .publishOn(Schedulers.elastic())
                .subscribe(body -> log.info("{}", body)));
Run Code Online (Sandbox Code Playgroud)

这是客户:

@PostMapping
public Mono<String> test(@RequestBody Mono<String> body) {
    return body.delayElement(Duration.ofSeconds(5));
}
Run Code Online (Sandbox Code Playgroud)

两者都运行在 netty 上。也许有人知道是什么导致了这种行为?

Bri*_*zel 5

这并不是由于WebClient连接池的限制,而是实际上来自于您可以更改的 Reactor 实现细节。

默认情况下,Reactor 运算符具有flatMapprefetch=32在最终订阅者请求之前我们请求的元素数量)和maxConcurrency=256(运算符同时处理的最大元素数量)。

您可以使用 的变体Flux.flatMap(Function mapper, int concurrency, int prefetch)来更改该行为。

subscribeOn您的代码片段使用了和的组合publishOn;我想说的是,鉴于您正在使用此代码片段进行反应式 I/O 工作,您不应该尝试在弹性/并行调度程序上安排工作。在这里最好删除这些运算符。