Mar*_*ick 1 spring spring-boot project-reactor spring-webflux
下面的代码并行执行所有 Web 请求 (webClient),不考虑我输入的限制parallel(5)。
Flux.fromIterable(dataListWithHundredsElements)
.parallel(5).runOn(Schedulers.boundedElastic())
.flatMap(element ->
webClient.post().
.bodyValue(element)
.retrieve()
.bodyToMono(String.class)
.doOnError(err -> element.setError(Utils.toString(err)))
.doOnSuccess(r -> element.setResponse(r))
)
.sequential()
.onErrorContinue((e, v) -> {})
.doOnComplete(() -> updateInDatabase(dataListWithHundresdElements))
.subscribe();
Run Code Online (Sandbox Code Playgroud)
我想知道是否可以根据中指定的值执行请求parallel(5)以及如何最好地执行请求?
一个细节,此代码是一个 Spring MVC 应用程序,我正在向外部服务发出请求。
更新 01
实际上 Flux 创建了 5 个线程,但是,所有请求(WebClient Mono)都是同时执行的。
我想要的是一次执行 5 个请求,所以当 1 个请求结束时,另一个请求会启动,但任何时候都不应该有 5 个以上的并行请求。
由于 Mono 也是一种响应式类型,在我看来 Flux 的 5 个线程调用它并且没有被阻塞,实际上发生的情况是所有请求都是并行发生的。
更新 02 - 外部服务日志
这是外部服务的日志,大约需要 5 秒才能响应。正如您在下面的日志中看到的,同时有 14 个请求。
2020-05-08 11:53:56.655 INFO 28223 --- [nio-8080-exec-8] EXTERNAL SERVICE LOG {"id": 21} http-nio-8080-exec-8
2020-05-08 11:53:56.655 INFO 28223 --- [nio-8080-exec-7] EXTERNAL SERVICE LOG {"id": 20} http-nio-8080-exec-7
2020-05-08 11:53:56.659 INFO 28223 --- [nio-8080-exec-2] EXTERNAL SERVICE LOG {"id": 27} http-nio-8080-exec-2
2020-05-08 11:53:56.659 INFO 28223 --- [nio-8080-exec-6] EXTERNAL SERVICE LOG {"id": 19} http-nio-8080-exec-6
2020-05-08 11:53:56.659 INFO 28223 --- [io-8080-exec-10] EXTERNAL SERVICE LOG {"id": 23} http-nio-8080-exec-10
2020-05-08 11:53:56.660 INFO 28223 --- [nio-8080-exec-5] EXTERNAL SERVICE LOG {"id": 18} http-nio-8080-exec-5
2020-05-08 11:53:56.660 INFO 28223 --- [nio-8080-exec-9] EXTERNAL SERVICE LOG {"id": 17} http-nio-8080-exec-9
2020-05-08 11:53:56.660 INFO 28223 --- [nio-8080-exec-1] EXTERNAL SERVICE LOG {"id": 29} http-nio-8080-exec-1
2020-05-08 11:53:56.661 INFO 28223 --- [nio-8080-exec-4] EXTERNAL SERVICE LOG {"id": 24} http-nio-8080-exec-4
2020-05-08 11:53:56.666 INFO 28223 --- [io-8080-exec-11] EXTERNAL SERVICE LOG {"id": 25} http-nio-8080-exec-11
2020-05-08 11:53:56.675 INFO 28223 --- [io-8080-exec-13] EXTERNAL SERVICE LOG {"id": 42} http-nio-8080-exec-13
2020-05-08 11:53:56.678 INFO 28223 --- [io-8080-exec-14] EXTERNAL SERVICE LOG {"id": 28} http-nio-8080-exec-14
2020-05-08 11:53:56.680 INFO 28223 --- [io-8080-exec-12] EXTERNAL SERVICE LOG {"id": 26} http-nio-8080-exec-12
2020-05-08 11:53:56.686 INFO 28223 --- [io-8080-exec-15] EXTERNAL SERVICE LOG {"id": 22} http-nio-8080-exec-15
Run Code Online (Sandbox Code Playgroud)
更新 03 - 反应堆日志
加强,外部服务大约需要5秒响应。然而,可以看到几乎同时发出所有请求 (14)。
2020-05-08 11:53:56.051 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.053 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.081 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.081 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.082 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.082 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.093 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.093 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.094 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.095 INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1 : request(unbounded)
2020-05-08 11:53:56.110 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onNext(@40ddcd53)
2020-05-08 11:53:56.112 INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1 : onNext(@200e0819)
2020-05-08 11:53:56.112 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onNext(@3b81eee2)
2020-05-08 11:53:56.113 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onNext(@60af2a4d)
2020-05-08 11:53:56.115 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onNext(@723db553)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onNext(@387743b5)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onNext(@62ed2f8d)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1 : onNext(@1a40554a)
2020-05-08 11:53:56.442 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onNext(@1bcb696a)
2020-05-08 11:53:56.440 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onNext(@46c98823)
2020-05-08 11:53:56.443 INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.446 INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.442 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onNext(@1c0da4a)
2020-05-08 11:53:56.448 INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.452 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onNext(@14d54d26)
2020-05-08 11:53:56.453 INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1 : onComplete()
2020-05-08 11:53:56.490 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onNext(@46e43af)
2020-05-08 11:53:56.492 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onNext(@5ca02355)
2020-05-08 11:53:56.496 INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1 : onComplete()
Run Code Online (Sandbox Code Playgroud)
您可以使用ParallelFlux#flatMap(Function<? super T,? extends Publisher<? extends R>>, boolean, int)方法来控制并发。
对于您的情况,可能是:
.flatMap(element ->
webClient.post().
.bodyValue(element)
.retrieve()
.bodyToMono(String.class)
.doOnError(err -> element.setError(Utils.toString(err)))
.doOnSuccess(r -> element.setResponse(r)),
false, 1
)
Run Code Online (Sandbox Code Playgroud)
但是,实际上,您不必创建ParallelFlux. 只需使用Flux#flatMap(Function<? super T,? extends Publisher<? extends V>>, int)方法:
Flux.fromIterable(dataListWithHundredsElements)
.flatMap(element -> webclient.post()..., 5)
...
Run Code Online (Sandbox Code Playgroud)
该flatMap方法的第二个参数负责并发。
| 归档时间: |
|
| 查看次数: |
1346 次 |
| 最近记录: |