标签: spring-webclient

Spring webflux WebClient 记录“连接由对等方重置”

我有以下代码,它使用 WebClient 进行 HTTP 调用。

        webClient.post()
                 .uri("/users/track")
                 .body(BodyInserters.fromObject(getUserTrackPayload(selection, customAttribute, partyId).toString()))
                 .header(CONTENT_TYPE, APPLICATION_JSON)
                 .retrieve()
                 .onStatus(httpStatus -> !CREATED.equals(httpStatus),
                           response -> response.bodyToMono(String.class)
                                               .flatMap(body -> buildErrorMessage(response.statusCode().value(), body, partyId,
                                                                                  customAttribute)
                                                   .flatMap(e -> Mono.error(new MyException(e)))))
                 .bodyToMono(Object.class)
                 .map(o -> (JsonObject)new Gson().toJsonTree(o))
                 .flatMap(body -> body.get("message") != null && body.get("message").getAsString().equalsIgnoreCase("success")
                                  && body.get("attributes_processed") != null && body.get("attributes_processed").getAsInt() == 1
                     ? Mono.just(body)
                     : buildErrorMessage(CREATED.value(), body.toString(), partyId, customAttribute)
                         .flatMap(e -> Mono.error(new MyException(e))));
Run Code Online (Sandbox Code Playgroud)

一段时间后(比如 10 分钟)第一次调用此代码时,我收到以下日志。但是,调用成功并输出了正确的结果。

io.netty.channel.unix.Errors$NativeIoException: syscall:read(..) failed: Connection reset by peer at io.netty.channel.unix.FileDescriptor.readAddress(..)(Unknown Source)
2019-03-19 03:11:45,625 WARN  [:::] [reactor-http-epoll-8] reactor.netty.http.client.HttpClientConnect : …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming spring-webflux spring-webclient

6
推荐指数
1
解决办法
1万
查看次数

如何测试Spring WebClient何时重试?

我需要实现以下行为:

  • 发出 REST 发布请求
  • 如果响应返回状态为429 Too many requests,则最多重试 3 次,延迟 1 秒
  • 如果第三次重试失败或发生任何其他错误,请记录并向数据库写入内容
  • 如果请求成功(http status 200),记录一些信息

我想使用 Spring WebClient 来实现此目的,并提出了以下代码:

Mono<ClientResponse> response = webClient.post()
            .uri(URI.create("/myuri"))
            .body(BodyInserters.fromObject(request))
            .retrieve()
            .onStatus(httpStatus -> httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS), 
                      response -> Mono.error(new TooManyRequestsException("System is overloaded")))
            .bodyToMono(ClientResponse.class)
            .retryWhen(Retry.anyOf(TooManyRequestsException.class)
                                          .fixedBackoff(Duration.ofSeconds(1)).retryMax(3))
            .doOnError(throwable -> saveToDB(some_id, throwable))
            .subscribe(response -> logResponse(some_id, response));
Run Code Online (Sandbox Code Playgroud)

现在我想测试重试机制和错误处理是否按我的预期工作。也许我可以使用StepVerifier来达到此目的,但我只是不知道如何在我的情况下使用它。有什么有用的提示吗?

reactive-programming rx-java project-reactor spring-webclient

6
推荐指数
1
解决办法
1万
查看次数

spring-cloud-loabalancer 配置静态服务器列表

我们正在一步一步地脱离 spring-cloud Netflix OSS 生态系统。目前我们正在实现 spring-cloud-loadbalancer 并删除 Ribbon。然而,我们过去在集成测试中有很多静态服务,现在随着从功能区转向 spring-cloud-loadbalancer,这些属性不再被选取。IE:

foo-service.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList
foo-service.ribbon.listOfServers=localhost:9876
Run Code Online (Sandbox Code Playgroud)

我们已通过以下方式迁移到使用 spring-cloud-loadbalancer
首先,我们使用 @LoadBalanced 注释我们的 Webclient.Builder,如下所示

@Bean
@LoadBalanced
fun webClientBuilder() = WebClient.builder()
Run Code Online (Sandbox Code Playgroud)

然后我们在客户端类上添加 @LoadBalancerClient 注释,如下所示

@LoadBalancerClient(name = "foo-service", configuration = [FooServiceConfiguration::class])
class FooServiceClient(private val basicAuthWebClient: WebClient)
Run Code Online (Sandbox Code Playgroud)

这会导致我们的测试失败,并出现 foo-service 的 UnknownHostException。

现在我的问题是我们如何在新的 spring-cloud-loadbalancer 中配置这个静态服务器列表?

spring spring-cloud spring-webflux spring-webclient

6
推荐指数
1
解决办法
2146
查看次数

Spring Boot WebClient XML

我的 Spring Boot 应用程序想要使用 Webclient 发出 http 请求(XML 请求正文)并接收 XML 响应。因此,我使用 jackson-dataformat-xml 创建了另一个 Spring Boot 应用程序,并创建了一个端点来接收和返回 XML,如下所示。

spring-boot-version=2.2.5
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.dataformat</groupId>
  <artifactId>jackson-dataformat-xml</artifactId>
</dependency>

@PostMapping(value = "/api",
            consumes = MediaType.APPLICATION_XML_VALUE,
            produces = MediaType.APPLICATION_XML_VALUE)
public ResponseEntity<MyXmlResponse> trip(@RequestBody MyXmlRequest request) throws Exception {
   MyXmlResponse response = new MyXmlResponse();
   response.setStatus("SUCCESS");
   response.setTripID(request.getTripID());
   return ResponseEntity.ok().body(response);
}
Run Code Online (Sandbox Code Playgroud)

它工作完美,显然不需要 JaxB 注释,因为我使用 jackson-dataformat-xml。此外,请求 XML 可以不区分大小写。

现在,在我的第一个应用程序中,我想通过 Web 客户端使用此 API。我读到 Spring webflux 还不支持 Jackson-dataformat-xml 。因此我必须使用 Jaxb 注释来注释我的类。

spring-boot-version=2.2.5
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

webClient.post() …
Run Code Online (Sandbox Code Playgroud)

jaxb spring-boot jackson-dataformat-xml spring-webflux spring-webclient

6
推荐指数
0
解决办法
7067
查看次数

使用 Spring Security 5 OAuth2.0 动态配置客户端注册范围

我想动态更改客户端注册的范围。我知道如何以这种方式设置注册:

spring:
  security:
    oauth2:
      client:
        registration:
          custom:
            client-id: clientId
            client-secret: clientSecret
            authorization-grant-type: client_credentials
        provider:
          custom:
            token-uri: http://localhost:8081/oauth/token
Run Code Online (Sandbox Code Playgroud)

我如何以编程方式配置它?

spring spring-security oauth-2.0 spring-boot spring-webclient

6
推荐指数
1
解决办法
2867
查看次数

Spring Webflux 使用 DataBuffer 将 WebClient.exchange() 迁移到 WebClient.exchangeToMono()

在将一个项目升级到 spring webflux 5.3.3 时,我注意到 Webclient.exchange 方法已被弃用(链接)。

我已经阅读了问题Spring WebFlux 5.3.0 - WebClient.exchangeToMono()并且我有一个关于使用 DataBuffer 以及如何在 .exchangeToMono() 中使用它们的问题。

到目前为止,我了解到新方法 WebClient.exchangeToMono() 和 .exchangeToFlux() 强制开发人员应该处理其中的请求主体,因为 spring webflux 的下一步是释放整个响应主体。

为了使我的基本问题更加明显,让我们假设我们想要创建一个代理,将所有内容传递给调用者。这可能看起来像:

.exchangeToMono { clientResp ->
   val statusCode = clientResponse.statusCode()
   val respHeaders = clientResponse.headers().asHttpHeaders()
   val body = clientResponse.body(BodyExtractors.toDataBuffers())
      .doOnEach {
         if (it.isOnComplete || it.isOnError) {
            it.get()?.let { buffer ->
               DataBufferUtils.release(buffer)
            }
         }
      }
   ServerResponse.status(statusCode)
      .headers { headers -> headers.addAll(respHeaders) }
      .body(BodyInserters.fromDataBuffers(body))
}
Run Code Online (Sandbox Code Playgroud)

但是,此代码片段不起作用,因为只有在有人订阅整个链时才会读取正文,但在 .exchangeToMono() 方法之后,响应正文将立即被释放,从而导致which.releaseIfNotConsumed()调用。response.releaseBody()body(BodyExtractors.toDataBuffers()).map(DataBufferUtils::release)

所以我的问题是,如果我们不想将完整的响应正文加载到内存中,这样的示例会是什么样子。有人可以指出我正确的方向吗?

spring-webflux spring-webclient

6
推荐指数
0
解决办法
2723
查看次数

如何在WebClient Springboot中处理“io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection Reset by Peer”

我的 ActiveMQ 中有 579,000 条待处理消息,我需要将这些消息发布到 REST API。在此过程中,我以每秒 3000 次异步点击的速度访问 REST API。一段时间后,我开始为每个请求不断收到下面提到的异常:-

\n
2021:06:10 05:25:40.002 [DefaultMessageListenerContainer-58047] [INFO] [com.main.consumer.AmazonMQConsumer] - Recieved TOPIC MESSAGE: DUMMY_MSG\n2021:06:10 05:25:40.004 [reactor-http-epoll-6] [WARN] [reactor.netty.http.client.HttpClientConnect] - [id: 0DUMM895fb, L:/DUMMY_IP:DUMMY_PORT - R:DUMMY-URL.net/DUMMY_IP':DUMMY_PORT'] The connection observed an error\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n2021:06:10 05:25:40.173 [reactor-http-epoll-7] [WARN] [reactor.netty.http.client.HttpClientConnect] - [id: 0DUMMf5600, L:/DUMMY_IP:DUMMY_PORT - R:DUMMY-URL.net/DUMMY_IP':DUMMY_PORT'] The connection observed an error\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n2021:06:10 05:25:40.365 [reactor-http-epoll-9] [WARN] [reactor.netty.http.client.HttpClientConnect] - [id: 0DUMM9223f, L:/DUMMY_IP:DUMMY_PORT - R:DUMMY-URL.net/DUMMY_IP':DUMMY_PORT'] The connection observed an error\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) …
Run Code Online (Sandbox Code Playgroud)

java spring spring-boot spring-webflux spring-webclient

6
推荐指数
0
解决办法
2万
查看次数

在另一种方法中处理来自 Spring WebClient 的错误

在 Spring Boot 应用程序中,我用来WebClient调用对远程应用程序的 POST 请求。该方法目前如下所示:

// Class A
public void sendNotification(String notification) {
    final WebClient webClient = WebClient.builder()
            .defaultHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE)
            .build();
    webClient.post()
            .uri("http://localhost:9000/api")
            .body(BodyInserters.fromValue(notification))
            .retrieve()
            .onStatus(HttpStatus::isError, clientResponse -> Mono.error(NotificationException::new))
            .toBodilessEntity()
            .block();
    log.info("Notification delivered successfully");
}

// Class B
public void someOtherMethod() {
    sendNotification("test");
}
Run Code Online (Sandbox Code Playgroud)

用例是:另一个类中的方法调用sendNotification并且应该处理任何错误,即任何非 2xx 状态或者甚至无法发送请求。

但我正在努力处理WebClient. 据我了解,以下行将捕获除 2xx/3xx 之外的任何 HTTP 状态,然后返回Mono.error带有NotificationException(自定义异常扩展Exception)的 a。

onStatus(HttpStatus::isError, clientResponse -> Mono.error(NotificationException::new))

但如何someOtherMethod()处理这种错误情况呢?它如何处理这个Mono.error?或者它实际上如何捕获NotificationExceptionifsendNotification甚至不将其放入签名中?

spring spring-webclient

6
推荐指数
1
解决办法
4万
查看次数

使用新的 Spring 6 HTTP 接口时“阻塞读取超时”

我们最近开始测试Spring 6 附带的新 HTTP 接口。

我们这样定义一个客户端:

    HttpClient client = (HttpClient)((HttpClient)HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)).doOnConnected((conn) -> {
        conn.addHandlerLast(new ReadTimeoutHandler(10000L, TimeUnit.MILLISECONDS));
    });
    WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(client)).baseUrl(locationUrl.toExternalForm()).build();
    HttpServiceProxyFactory factory = HttpServiceProxyFactory.builder(WebClientAdapter.forClient(webClient)).build();
Run Code Online (Sandbox Code Playgroud)

界面可以看起来像这样:

@GetExchange("/availability")
    AvailabilityResponse getAvailability(@RequestParam("products") List<String> itemIds);
Run Code Online (Sandbox Code Playgroud)

在使用客户端时,我们偶尔会遇到如下异常。我们最初认为这与我们定义为 5 秒的连接超时有关,但这似乎与反应器阻塞有关,而不是与实际的 HTTP 调用有关。

在搜索类似问题时,我发现的所有内容似乎都与 WebClient 的单元测试有关,具有诸如设置之类的解决方案@AutoConfigureWebTestClient(timeout = "30000"),因此这对我们没有帮助。我们如何增加阻塞获取超时以匹配我们的 ReadTimeoutHandler 10 秒?

java.lang.IllegalStateException: Timeout on blocking read for 5000000000 NANOSECONDS
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:123)
    at reactor.core.publisher.Mono.block(Mono.java:1734)
    at org.springframework.web.service.invoker.HttpServiceMethod$ResponseFunction.execute(HttpServiceMethod.java:296)
    at org.springframework.web.service.invoker.HttpServiceMethod.invoke(HttpServiceMethod.java:105)
    at org.springframework.web.service.invoker.HttpServiceProxyFactory$HttpServiceMethodInterceptor.invoke(HttpServiceProxyFactory.java:271)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:218)
    at jdk.proxy3/jdk.proxy3.$Proxy135.searchPharmacies(Unknown Source)
    at <redacted>
    at jdk.internal.reflect.GeneratedMethodAccessor128.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) …
Run Code Online (Sandbox Code Playgroud)

java spring spring-boot spring-webclient

6
推荐指数
1
解决办法
2646
查看次数

没有可用的“org.springframework.security.oauth2.client.registration.ClientRegistrationRepository”类型的合格 bean

当我尝试构建项目时,我收到以下错误消息:

: Unsatisfied dependency expressed through method 'webClient' parameter 0; 
nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: 
No qualifying bean of type 
'org.springframework.security.oauth2.client.registration.ReactiveClientRegistrationRepository' 
available: expected at least 1 bean which qualifies as autowire candidate.
Run Code Online (Sandbox Code Playgroud)

这是我的应用程序属性:

spring.security.oauth2.client.registration.eipo.authorization-grant-type=client_credentials
spring.security.oauth2.client.registration.eipo.token-uri=https://devapi.somedomain.co.xx/v1/auth/token
spring.security.oauth2.client.registration.eipo.client-id=randomClientId
spring.security.oauth2.client.registration.eipo.client-secret=T#%*sty%xp4^sdxb(e*
spring.main.web-application-type= reactive

Run Code Online (Sandbox Code Playgroud)

这是我的配置类:

@Configuration
public class WebClientConfig {
  @Bean
  WebClient webClient(ReactiveClientRegistrationRepository clientRegistrations) {
      ServerOAuth2AuthorizedClientExchangeFilterFunction oauth =
        new ServerOAuth2AuthorizedClientExchangeFilterFunction(
          clientRegistrations,
          new UnAuthenticatedServerOAuth2AuthorizedClientRepository());
      oauth.setDefaultClientRegistrationId("eipo"); 
      return WebClient.builder()
        .filter(oauth)
        .build();
  }
}

Run Code Online (Sandbox Code Playgroud)

这是服务类别:


@Slf4j
@Service
public class SettlementService
  @Autowired
  private WebClient webClient;

  public void getAuthThenGetResource(){
    ... //trying to …
Run Code Online (Sandbox Code Playgroud)

java spring oauth-2.0 spring-webclient

6
推荐指数
1
解决办法
6853
查看次数