如何使用Spring WebClient进行同步调用?

Ope*_*oad 4 java spring rest-client spring-boot spring-webclient

Spring FrameworkRestTemplate文档中有注释:

\n
\n

注意:截至5.0此类处于维护模式,仅接受较小的\n更改请求和错误。请考虑使用org.springframework.web.reactive.client.WebClient具有更现代的 API 并支持同步、异步和流式传输的方案。

\n
\n

当我尝试使用WebClient并进行同步调用时,如下所示:

\n
@RestController\n@RequestMapping("/rating")\npublic class Controller {\n\n    @Autowired\n    private RestTemplate restTemplate;\n\n    @Autowired\n    private WebClient.Builder webClientBuilder;\n\n    @RequestMapping("/{userId}")\n    public UserRating getUserRating(@PathVariable("userId") String userId) {\n\n//      return restTemplate.getForObject("http://localhost:8083/ratingsdata/user/" + userId, UserRating.class);\n        return webClientBuilder.build()\n                .get().uri("http://localhost:8083/ratingsdata/user/" + userId)\n                .retrieve()\n                .bodyToMono(UserRating.class)\n                .block();\n\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

项目pom.xml:

\n
<?xml version="1.0" encoding="UTF-8"?>\n<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\n         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">\n    <modelVersion>4.0.0</modelVersion>\n    <parent>\n        <groupId>org.springframework.boot</groupId>\n        <artifactId>spring-boot-starter-parent</artifactId>\n        <version>2.4.0</version>\n        <relativePath/> <!-- lookup parent from repository -->\n    </parent>\n    <groupId>com.daren.tutorial.movie</groupId>\n    <artifactId>catalog</artifactId>\n    <version>0.0.1-SNAPSHOT</version>\n    <name>catalog</name>\n    <description>Movie catalog</description>\n\n    <properties>\n        <java.version>11</java.version>\n    </properties>\n\n    <dependencies>\n        <dependency>\n            <groupId>org.springframework.boot</groupId>\n            <artifactId>spring-boot-starter-webflux</artifactId>\n        </dependency>\n\n        <dependency>\n            <groupId>org.springframework.boot</groupId>\n            <artifactId>spring-boot-starter-test</artifactId>\n            <scope>test</scope>\n        </dependency>\n        <dependency>\n            <groupId>io.projectreactor</groupId>\n            <artifactId>reactor-test</artifactId>\n            <scope>test</scope>\n        </dependency>\n    </dependencies>\n\n    <build>\n        <plugins>\n            <plugin>\n                <groupId>org.springframework.boot</groupId>\n                <artifactId>spring-boot-maven-plugin</artifactId>\n            </plugin>\n        </plugins>\n    </build>\n\n</project>\n
Run Code Online (Sandbox Code Playgroud)\n

我收到以下错误:

\n
    2020-11-15 10:40:15.359 ERROR 7175 --- [or-http-epoll-2] a.w.r.e.AbstractErrorWebExceptionHandler : [8776f4a6-1]  500 Server Error for HTTP GET "/rating/15"\n\njava.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-2\n    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.0.jar:3.4.0]\n    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nError has been observed at the following site(s):\n    |_ checkpoint \xe2\x87\xa2 HTTP GET "/rating/15" [ExceptionHandlingWebHandler]\nStack trace:\n        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.Mono.block(Mono.java:1679) ~[reactor-core-3.4.0.jar:3.4.0]\n        at com.daren.tutorial.movie.catalog.resources.Controller.getUserRating(Controller.java:39) ~[classes/:na]\n        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]\n        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) ~[na:na]\n        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]\n        at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]\n        at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:146) ~[spring-webflux-5.3.1.jar:5.3.1]\n        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:99) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2154) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2028) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:191) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.Mono.subscribe(Mono.java:3972) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:154) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2154) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2028) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]\n        at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:611) ~[reactor-netty-http-1.0.1.jar:1.0.1]\n        at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:612) ~[reactor-netty-core-1.0.1.jar:1.0.1]\n        at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:453) ~[reactor-netty-core-1.0.1.jar:1.0.1]\n        at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:510) ~[reactor-netty-http-1.0.1.jar:1.0.1]\n        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.1.jar:1.0.1]\n        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:185) ~[reactor-netty-http-1.0.1.jar:1.0.1]\n        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) ~[netty-transport-native-epoll-4.1.54.Final-linux-x86_64.jar:4.1.54.Final]\n        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) ~[netty-transport-native-epoll-4.1.54.Final-linux-x86_64.jar:4.1.54.Final]\n        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.54.Final-linux-x86_64.jar:4.1.54.Final]\n        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.54.Final.jar:4.1.54.Final]\n        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.54.Final.jar:4.1.54.Final]\n        at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]\n
Run Code Online (Sandbox Code Playgroud)\n

使用注释行 - 带有RestTemplate对象的行 - 应用程序可以正常工作。\n我是否必须使用 deprecatedRestTemplate来进行同步休息调用?

\n

Ope*_*oad 5

我找到了问题原因并解决了:

因为类路径上只有webflux,所以应用程序在NettyWeb 服务器上启动。此配置中的请求在reactor-http-epoll-2线程上处理,该线程不是reactor.core.scheduler.NonBlocking. 它引起了IllegalStateException.

为了解决这个问题,我必须添加这个依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
Run Code Online (Sandbox Code Playgroud)

现在,应用程序默认启动Tomcat,处理请求的线程http-nio-8081-exec-1NonBlocking.