我有以下代码,它使用 WebClient 进行 HTTP 调用。
webClient.post()
.uri("/users/track")
.body(BodyInserters.fromObject(getUserTrackPayload(selection, customAttribute, partyId).toString()))
.header(CONTENT_TYPE, APPLICATION_JSON)
.retrieve()
.onStatus(httpStatus -> !CREATED.equals(httpStatus),
response -> response.bodyToMono(String.class)
.flatMap(body -> buildErrorMessage(response.statusCode().value(), body, partyId,
customAttribute)
.flatMap(e -> Mono.error(new MyException(e)))))
.bodyToMono(Object.class)
.map(o -> (JsonObject)new Gson().toJsonTree(o))
.flatMap(body -> body.get("message") != null && body.get("message").getAsString().equalsIgnoreCase("success")
&& body.get("attributes_processed") != null && body.get("attributes_processed").getAsInt() == 1
? Mono.just(body)
: buildErrorMessage(CREATED.value(), body.toString(), partyId, customAttribute)
.flatMap(e -> Mono.error(new MyException(e))));
Run Code Online (Sandbox Code Playgroud)
一段时间后(比如 10 分钟)第一次调用此代码时,我收到以下日志。但是,调用成功并输出了正确的结果。
io.netty.channel.unix.Errors$NativeIoException: syscall:read(..) failed: Connection reset by peer at io.netty.channel.unix.FileDescriptor.readAddress(..)(Unknown Source)
2019-03-19 03:11:45,625 WARN [:::] [reactor-http-epoll-8] reactor.netty.http.client.HttpClientConnect : …Run Code Online (Sandbox Code Playgroud) 我需要实现以下行为:
429 Too many requests,则最多重试 3 次,延迟 1 秒我想使用 Spring WebClient 来实现此目的,并提出了以下代码:
Mono<ClientResponse> response = webClient.post()
.uri(URI.create("/myuri"))
.body(BodyInserters.fromObject(request))
.retrieve()
.onStatus(httpStatus -> httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),
response -> Mono.error(new TooManyRequestsException("System is overloaded")))
.bodyToMono(ClientResponse.class)
.retryWhen(Retry.anyOf(TooManyRequestsException.class)
.fixedBackoff(Duration.ofSeconds(1)).retryMax(3))
.doOnError(throwable -> saveToDB(some_id, throwable))
.subscribe(response -> logResponse(some_id, response));
Run Code Online (Sandbox Code Playgroud)
现在我想测试重试机制和错误处理是否按我的预期工作。也许我可以使用StepVerifier来达到此目的,但我只是不知道如何在我的情况下使用它。有什么有用的提示吗?
reactive-programming rx-java project-reactor spring-webclient
我们正在一步一步地脱离 spring-cloud Netflix OSS 生态系统。目前我们正在实现 spring-cloud-loadbalancer 并删除 Ribbon。然而,我们过去在集成测试中有很多静态服务,现在随着从功能区转向 spring-cloud-loadbalancer,这些属性不再被选取。IE:
foo-service.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList
foo-service.ribbon.listOfServers=localhost:9876
Run Code Online (Sandbox Code Playgroud)
我们已通过以下方式迁移到使用 spring-cloud-loadbalancer
首先,我们使用 @LoadBalanced 注释我们的 Webclient.Builder,如下所示
@Bean
@LoadBalanced
fun webClientBuilder() = WebClient.builder()
Run Code Online (Sandbox Code Playgroud)
然后我们在客户端类上添加 @LoadBalancerClient 注释,如下所示
@LoadBalancerClient(name = "foo-service", configuration = [FooServiceConfiguration::class])
class FooServiceClient(private val basicAuthWebClient: WebClient)
Run Code Online (Sandbox Code Playgroud)
这会导致我们的测试失败,并出现 foo-service 的 UnknownHostException。
现在我的问题是我们如何在新的 spring-cloud-loadbalancer 中配置这个静态服务器列表?
我的 Spring Boot 应用程序想要使用 Webclient 发出 http 请求(XML 请求正文)并接收 XML 响应。因此,我使用 jackson-dataformat-xml 创建了另一个 Spring Boot 应用程序,并创建了一个端点来接收和返回 XML,如下所示。
spring-boot-version=2.2.5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
@PostMapping(value = "/api",
consumes = MediaType.APPLICATION_XML_VALUE,
produces = MediaType.APPLICATION_XML_VALUE)
public ResponseEntity<MyXmlResponse> trip(@RequestBody MyXmlRequest request) throws Exception {
MyXmlResponse response = new MyXmlResponse();
response.setStatus("SUCCESS");
response.setTripID(request.getTripID());
return ResponseEntity.ok().body(response);
}
Run Code Online (Sandbox Code Playgroud)
它工作完美,显然不需要 JaxB 注释,因为我使用 jackson-dataformat-xml。此外,请求 XML 可以不区分大小写。
现在,在我的第一个应用程序中,我想通过 Web 客户端使用此 API。我读到 Spring webflux 还不支持 Jackson-dataformat-xml 。因此我必须使用 Jaxb 注释来注释我的类。
spring-boot-version=2.2.5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
webClient.post() …Run Code Online (Sandbox Code Playgroud) jaxb spring-boot jackson-dataformat-xml spring-webflux spring-webclient
我想动态更改客户端注册的范围。我知道如何以这种方式设置注册:
spring:
security:
oauth2:
client:
registration:
custom:
client-id: clientId
client-secret: clientSecret
authorization-grant-type: client_credentials
provider:
custom:
token-uri: http://localhost:8081/oauth/token
Run Code Online (Sandbox Code Playgroud)
我如何以编程方式配置它?
spring spring-security oauth-2.0 spring-boot spring-webclient
在将一个项目升级到 spring webflux 5.3.3 时,我注意到 Webclient.exchange 方法已被弃用(链接)。
我已经阅读了问题Spring WebFlux 5.3.0 - WebClient.exchangeToMono()并且我有一个关于使用 DataBuffer 以及如何在 .exchangeToMono() 中使用它们的问题。
到目前为止,我了解到新方法 WebClient.exchangeToMono() 和 .exchangeToFlux() 强制开发人员应该处理其中的请求主体,因为 spring webflux 的下一步是释放整个响应主体。
为了使我的基本问题更加明显,让我们假设我们想要创建一个代理,将所有内容传递给调用者。这可能看起来像:
.exchangeToMono { clientResp ->
val statusCode = clientResponse.statusCode()
val respHeaders = clientResponse.headers().asHttpHeaders()
val body = clientResponse.body(BodyExtractors.toDataBuffers())
.doOnEach {
if (it.isOnComplete || it.isOnError) {
it.get()?.let { buffer ->
DataBufferUtils.release(buffer)
}
}
}
ServerResponse.status(statusCode)
.headers { headers -> headers.addAll(respHeaders) }
.body(BodyInserters.fromDataBuffers(body))
}
Run Code Online (Sandbox Code Playgroud)
但是,此代码片段不起作用,因为只有在有人订阅整个链时才会读取正文,但在 .exchangeToMono() 方法之后,响应正文将立即被释放,从而导致which.releaseIfNotConsumed()调用。response.releaseBody()body(BodyExtractors.toDataBuffers()).map(DataBufferUtils::release)
所以我的问题是,如果我们不想将完整的响应正文加载到内存中,这样的示例会是什么样子。有人可以指出我正确的方向吗?
我的 ActiveMQ 中有 579,000 条待处理消息,我需要将这些消息发布到 REST API。在此过程中,我以每秒 3000 次异步点击的速度访问 REST API。一段时间后,我开始为每个请求不断收到下面提到的异常:-
\n2021:06:10 05:25:40.002 [DefaultMessageListenerContainer-58047] [INFO] [com.main.consumer.AmazonMQConsumer] - Recieved TOPIC MESSAGE: DUMMY_MSG\n2021:06:10 05:25:40.004 [reactor-http-epoll-6] [WARN] [reactor.netty.http.client.HttpClientConnect] - [id: 0DUMM895fb, L:/DUMMY_IP:DUMMY_PORT - R:DUMMY-URL.net/DUMMY_IP':DUMMY_PORT'] The connection observed an error\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n2021:06:10 05:25:40.173 [reactor-http-epoll-7] [WARN] [reactor.netty.http.client.HttpClientConnect] - [id: 0DUMMf5600, L:/DUMMY_IP:DUMMY_PORT - R:DUMMY-URL.net/DUMMY_IP':DUMMY_PORT'] The connection observed an error\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n2021:06:10 05:25:40.365 [reactor-http-epoll-9] [WARN] [reactor.netty.http.client.HttpClientConnect] - [id: 0DUMM9223f, L:/DUMMY_IP:DUMMY_PORT - R:DUMMY-URL.net/DUMMY_IP':DUMMY_PORT'] The connection observed an error\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) …Run Code Online (Sandbox Code Playgroud) 在 Spring Boot 应用程序中,我用来WebClient调用对远程应用程序的 POST 请求。该方法目前如下所示:
// Class A
public void sendNotification(String notification) {
final WebClient webClient = WebClient.builder()
.defaultHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE)
.build();
webClient.post()
.uri("http://localhost:9000/api")
.body(BodyInserters.fromValue(notification))
.retrieve()
.onStatus(HttpStatus::isError, clientResponse -> Mono.error(NotificationException::new))
.toBodilessEntity()
.block();
log.info("Notification delivered successfully");
}
// Class B
public void someOtherMethod() {
sendNotification("test");
}
Run Code Online (Sandbox Code Playgroud)
用例是:另一个类中的方法调用sendNotification并且应该处理任何错误,即任何非 2xx 状态或者甚至无法发送请求。
但我正在努力处理WebClient. 据我了解,以下行将捕获除 2xx/3xx 之外的任何 HTTP 状态,然后返回Mono.error带有NotificationException(自定义异常扩展Exception)的 a。
onStatus(HttpStatus::isError, clientResponse -> Mono.error(NotificationException::new))
但如何someOtherMethod()处理这种错误情况呢?它如何处理这个Mono.error?或者它实际上如何捕获NotificationExceptionifsendNotification甚至不将其放入签名中?
我们最近开始测试Spring 6 附带的新 HTTP 接口。
我们这样定义一个客户端:
HttpClient client = (HttpClient)((HttpClient)HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)).doOnConnected((conn) -> {
conn.addHandlerLast(new ReadTimeoutHandler(10000L, TimeUnit.MILLISECONDS));
});
WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(client)).baseUrl(locationUrl.toExternalForm()).build();
HttpServiceProxyFactory factory = HttpServiceProxyFactory.builder(WebClientAdapter.forClient(webClient)).build();
Run Code Online (Sandbox Code Playgroud)
界面可以看起来像这样:
@GetExchange("/availability")
AvailabilityResponse getAvailability(@RequestParam("products") List<String> itemIds);
Run Code Online (Sandbox Code Playgroud)
在使用客户端时,我们偶尔会遇到如下异常。我们最初认为这与我们定义为 5 秒的连接超时有关,但这似乎与反应器阻塞有关,而不是与实际的 HTTP 调用有关。
在搜索类似问题时,我发现的所有内容似乎都与 WebClient 的单元测试有关,具有诸如设置之类的解决方案@AutoConfigureWebTestClient(timeout = "30000"),因此这对我们没有帮助。我们如何增加阻塞获取超时以匹配我们的 ReadTimeoutHandler 10 秒?
java.lang.IllegalStateException: Timeout on blocking read for 5000000000 NANOSECONDS
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:123)
at reactor.core.publisher.Mono.block(Mono.java:1734)
at org.springframework.web.service.invoker.HttpServiceMethod$ResponseFunction.execute(HttpServiceMethod.java:296)
at org.springframework.web.service.invoker.HttpServiceMethod.invoke(HttpServiceMethod.java:105)
at org.springframework.web.service.invoker.HttpServiceProxyFactory$HttpServiceMethodInterceptor.invoke(HttpServiceProxyFactory.java:271)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:218)
at jdk.proxy3/jdk.proxy3.$Proxy135.searchPharmacies(Unknown Source)
at <redacted>
at jdk.internal.reflect.GeneratedMethodAccessor128.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) …Run Code Online (Sandbox Code Playgroud) 当我尝试构建项目时,我收到以下错误消息:
: Unsatisfied dependency expressed through method 'webClient' parameter 0;
nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException:
No qualifying bean of type
'org.springframework.security.oauth2.client.registration.ReactiveClientRegistrationRepository'
available: expected at least 1 bean which qualifies as autowire candidate.
Run Code Online (Sandbox Code Playgroud)
这是我的应用程序属性:
spring.security.oauth2.client.registration.eipo.authorization-grant-type=client_credentials
spring.security.oauth2.client.registration.eipo.token-uri=https://devapi.somedomain.co.xx/v1/auth/token
spring.security.oauth2.client.registration.eipo.client-id=randomClientId
spring.security.oauth2.client.registration.eipo.client-secret=T#%*sty%xp4^sdxb(e*
spring.main.web-application-type= reactive
Run Code Online (Sandbox Code Playgroud)
这是我的配置类:
@Configuration
public class WebClientConfig {
@Bean
WebClient webClient(ReactiveClientRegistrationRepository clientRegistrations) {
ServerOAuth2AuthorizedClientExchangeFilterFunction oauth =
new ServerOAuth2AuthorizedClientExchangeFilterFunction(
clientRegistrations,
new UnAuthenticatedServerOAuth2AuthorizedClientRepository());
oauth.setDefaultClientRegistrationId("eipo");
return WebClient.builder()
.filter(oauth)
.build();
}
}
Run Code Online (Sandbox Code Playgroud)
这是服务类别:
@Slf4j
@Service
public class SettlementService
@Autowired
private WebClient webClient;
public void getAuthThenGetResource(){
... //trying to …Run Code Online (Sandbox Code Playgroud) spring-webclient ×10
spring ×6
java ×4
spring-boot ×4
oauth-2.0 ×2
jaxb ×1
rx-java ×1
spring-cloud ×1