小编Mar*_*ick的帖子

如何控制 Flux.flatMap (Mono) 的并行度?

下面的代码并行执行所有 Web 请求 (webClient),不考虑我输入的限制parallel(5)

        Flux.fromIterable(dataListWithHundredsElements)
            .parallel(5).runOn(Schedulers.boundedElastic())
            .flatMap(element -> 
                webClient.post().
                .bodyValue(element)
                .retrieve()
                .bodyToMono(String.class)
                .doOnError(err -> element.setError(Utils.toString(err)))
                .doOnSuccess(r -> element.setResponse(r))
            )
            .sequential()
            .onErrorContinue((e, v) -> {})
            .doOnComplete(() -> updateInDatabase(dataListWithHundresdElements))
            .subscribe();
Run Code Online (Sandbox Code Playgroud)

我想知道是否可以根据中指定的值执行请求parallel(5)以及如何最好地执行请求?

一个细节,此代码是一个 Spring MVC 应用程序,我正在向外部服务发出请求。

更新 01

实际上 Flux 创建了 5 个线程,但是,所有请求(WebClient Mono)都是同时执行的。

我想要的是一次执行 5 个请求,所以当 1 个请求结束时,另一个请求会启动,但任何时候都不应该有 5 个以上的并行请求。

由于 Mono 也是一种响应式类型,在我看来 Flux 的 5 个线程调用它并且没有被阻塞,实际上发生的情况是所有请求都是并行发生的。

更新 02 - 外部服务日志

这是外部服务的日志,大约需要 5 秒才能响应。正如您在下面的日志中看到的,同时有 14 个请求。

2020-05-08 11:53:56.655  INFO 28223 --- [nio-8080-exec-8] EXTERNAL SERVICE LOG {"id": 21} http-nio-8080-exec-8 …
Run Code Online (Sandbox Code Playgroud)

spring spring-boot project-reactor spring-webflux

1
推荐指数
1
解决办法
1346
查看次数