使用 spring 反应式 webClient 面临问题“WebClientRequestException:待处理的获取队列已达到其最大大小 1000”

raj*_*ehl 11 reactor-netty spring-webflux spring-webclient

我正在运行微服务 API 的负载,其中涉及使用 Spring Reactive Webclient 调用其他微服务 API。我正在使用 Postman runner 选项卡来测试这一点。

\n

首先,我以 1500 次迭代运行负载,每个请求都会调用第二个微服务,一切都按预期正常工作。\n但是当我以 5000 次迭代运行负载时,第二个微服务被调用 3500 次,并且调用次数为 1500 次。由于问题而失败

\n
\n

WebClientRequestException:待处理获取队列已达到其最大大小 1000

\n
\n

使用默认配置的 org.springframework.web.reactive.function.client.WebClient ,下面是代码片段。

\n
 private WebClient webClient;\n\n    @PostConstruct\n    public void init() {\n        this.webClient = WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE,  MediaType.APPLICATION_JSON_VALUE)\n                .build();\n    }\n
Run Code Online (Sandbox Code Playgroud)\n

可以采取什么措施来避免这种情况?

\n

我正在使用最新的 spring-boot-starter-parent 依赖项(版本 2.5.3)和 spring-webflux-5.3.9.jar jar。

\n

日志:

\n
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3\nCaused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3\n        at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)\n        at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67)\n        at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557)\n        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:375)\n        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:296)\n        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:885)\n        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)\n        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)\n        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)\n        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)\n        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)\n        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)\n        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)\n        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)\n        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)\n        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)\n        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)\n        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\n        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\n        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n        at java.base/java.lang.Thread.run(Thread.java:829)\n\n**Caused by: org.springframework.web.reactive.function.client.WebClientRequestException: Pending acquire queue has reached its maximum size of 1000; nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000**\n        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)\n        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:\nError has been observed at the following site(s):\n        |_ checkpoint \xe2\x87\xa2 Request to POST http://172.20.0.2:3130/v1/login/mobile [DefaultWebClient]\nStack trace:\n                at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)\n                at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)\n                at reactor.core.publisher.Mono.subscribe(Mono.java:4338)\n                at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)\n                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)\n                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)\n                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)\n                at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)\n                at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)\n                at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)\n                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)\n                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)\n                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:414)\n                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)\n                at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)\nat reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)\n                at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)\n                at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)\n                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)\n                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)\n                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$ClientTransportSubscriber.onError(HttpClientConnect.java:304)\n                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)\n                at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onError(DefaultPooledConnectionProvider.java:172)\n                at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:444)\n                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:543)\n                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:266)\n                at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:399)\n                at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)\n                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:674)\n                at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:137)\n                at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)\n                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)\n                at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)\n                at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)\n                at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)\n                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)\n                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:271)\n                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n                at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)\n                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.resubscribe(FluxRetryWhen.java:216)\n                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onNext(FluxRetryWhen.java:269)\n                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282)\n                at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:861)\n                at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)\n                at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)\n                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)\n                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)\n                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)\n                at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)\n                at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)\n                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)\n                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)\n                at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)\n                at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)\n                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\n                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\n                at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n                at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n                at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n                at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n                at java.base/java.lang.Thread.run(Thread.java:829)\n**Caused by: reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000\n        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:543)**\n        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:266)\n        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:399)\n        at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)\n        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:674)\n        at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:137)\n        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)\n        at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)\n        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)\n        at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)\n        at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)\n
Run Code Online (Sandbox Code Playgroud)\n

Mic*_*yen 24

WebClient 需要一个 HTTP 客户端库来执行请求,默认情况下它使用 Reactor Netty。

\n

引用自Reactor-netty 参考文档

\n
\n

默认情况下,Reactor Netty 客户端使用 \xe2\x80\x9cfixed\xe2\x80\x9d 连接池,\n500 作为活动通道的最大数量,1000 作为允许保留在连接中的进一步通道获取尝试的最大数量。 \n待定状态(对于其余配置,请检查系统\n属性或下面的构建器配置)。这意味着,如果有人尝试获取通道,\n只要创建的通道少于 500 个并由池管理,\n实现就会创建一个新通道。当达到池中通道的最大数量时,\n最多 1000 次获取通道的新尝试将被延迟(挂起),\n直到通道再次返回到池中,\n进一步的尝试将因错误而被拒绝。

\n
\n

您所看到的是,您正在积极使用连接池中的所有 500 个连接,并且您已经用 1000 个待处理请求填满了“待处理”队列。

\n

您有 2 个选择来解决此问题

\n

垂直缩放

\n

增加连接池大小和/或获取队列长度

\n
ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")\n        .maxConnections(<your_desired_max_connections>)\n        .pendingAcquireMaxCount(<your_desired_pending_queue_size>)\n        .build();\nReactorClientHttpConnector clientHttpConnector = new ReactorClientHttpConnector(HttpClient.create(connectionProvider));\nWebClient.builder()\n        .clientConnector(clientHttpConnector)\n        .build();\n
Run Code Online (Sandbox Code Playgroud)\n

水平缩放

\n

创建应用程序的其他实例并在实例之间平衡 api 调用的负载。

\n

Spring 参考文档

\n

附加说明:

\n

在计算连接池的大小时,值得考虑下游 api 调用的延迟。一个好的起点是

\n
\n

连接池大小 = tps *下游 API 延迟

\n
\n

tps(每秒事务数)

\n