标签: reactor-netty

Spring 5 webflux如何在Webclient上设置超时

我正在尝试在我的WebClient上设置超时,这是当前代码:

SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();

ClientHttpConnector httpConnector = new ReactorClientHttpConnector(opt -> {
    opt.sslContext(sslContext);
    HttpClientOptions option = HttpClientOptions.builder().build();
    opt.from(option);
});
return WebClient.builder().clientConnector(httpConnector).defaultHeader("Authorization", xxxx)
                .baseUrl(this.opusConfig.getBaseURL()).build();
Run Code Online (Sandbox Code Playgroud)

我需要添加超时和汇集策略,我正在考虑这样的事情:

PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(this.applicationConfig.getHttpClientMaxPoolSize());
cm.setDefaultMaxPerRoute(this.applicationConfig.getHttpClientMaxPoolSize());
cm.closeIdleConnections(this.applicationConfig.getServerIdleTimeout(), TimeUnit.MILLISECONDS);

RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(this.applicationConfig.getHttpClientSocketTimeout())
        .setConnectTimeout(this.applicationConfig.getHttpClientConnectTimeout())
        .setConnectionRequestTimeout(this.applicationConfig.getHttpClientRequestTimeout()).build();

CloseableHttpClient httpClient = HttpClients.custom().setDefaultRequestConfig(requestConfig).setConnectionManager(cm).build();
Run Code Online (Sandbox Code Playgroud)

但我无法弄清楚如何在我的webclient中设置httpClient

spring reactor reactor-netty spring-webflux

15
推荐指数
4
解决办法
1万
查看次数

使用spring-webflux WebClient的反应堆净值配置HostnameVerifier

我正在尝试使用ssl和客户端主机名验证来配置spring-webflux WebClient(具有底层的Reactor Netty)。我提供了javax.net.ssl.SSLContext,HostnameVerifier和受信任的主机名列表(作为字符串列表)。

到目前为止,我已经使用SSLContext配置了WebClient,但是找不到配置主机名验证的方法。

陈述我的问题:我有一组信任的服务主机名(字符串列表)和一个HostnameVerifier。我想用它配置我的WebClient。

是否可以使用javax.net.ssl.HostnameVerifier做到这一点?反应堆网络中是否有替代方法?

到目前为止,这是我得到的:

WebClient.builder()
  .clientConnector(
    new ReactorClientHttpConnector(
      opt -> opt.sslContext(new JdkSslContext(mySSLContext, 
                      true, ClientAuth.OPTIONAL))))
  .build();
Run Code Online (Sandbox Code Playgroud)

java ssl https reactor-netty spring-webflux

12
推荐指数
1
解决办法
855
查看次数

使用 spring 反应式 webClient 面临问题“WebClientRequestException:待处理的获取队列已达到其最大大小 1000”

我正在运行微服务 API 的负载,其中涉及使用 Spring Reactive Webclient 调用其他微服务 API。我正在使用 Postman runner 选项卡来测试这一点。

\n

首先,我以 1500 次迭代运行负载,每个请求都会调用第二个微服务,一切都按预期正常工作。\n但是当我以 5000 次迭代运行负载时,第二个微服务被调用 3500 次,并且调用次数为 1500 次。由于问题而失败

\n
\n

WebClientRequestException:待处理获取队列已达到其最大大小 1000

\n
\n

使用默认配置的 org.springframework.web.reactive.function.client.WebClient ,下面是代码片段。

\n
 private WebClient webClient;\n\n    @PostConstruct\n    public void init() {\n        this.webClient = WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE,  MediaType.APPLICATION_JSON_VALUE)\n                .build();\n    }\n
Run Code Online (Sandbox Code Playgroud)\n

可以采取什么措施来避免这种情况?

\n

我正在使用最新的 spring-boot-starter-parent 依赖项(版本 2.5.3)和 spring-webflux-5.3.9.jar jar。

\n

日志:

\n
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3\nCaused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3\n        at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)\n        at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67)\n        at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557)\n        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:375)\n        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:296)\n        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:885)\n        at …
Run Code Online (Sandbox Code Playgroud)

reactor-netty spring-webflux spring-webclient

11
推荐指数
1
解决办法
2万
查看次数

通过Spring Websocket STOMP打开连接会导致我们的服务器死机

所以我们在后端使用Spring websocket STOMP + RabbitMQ,我们遇到了打开文件描述符的麻烦.经过一段时间后,我们达到了服务器的限制,服务器不接受任何连接,包括websockets和API端点.

2018-09-14 18:04:13.605  INFO 1288 --- [MessageBroker-1] 
o.s.w.s.c.WebSocketMessageBrokerStats    : WebSocketSession[2 current WS(2)- 
HttpStream(0)-HttpPoll(0), 1159 total, 0 closed abnormally (0 connect 
failure, 0 send limit, 63 transport error)], stompSubProtocol[processed 
CONNECT(1014)-CONNECTED(1004)-DISCONNECT(0)], stompBrokerRelay[9 sessions, 
127.0.0.1:61613 (available), processed CONNECT(1015)-CONNECTED(1005)- 
DISCONNECT(1011)], inboundChannel[pool size = 2, active threads = 2, queued 
tasks = 2, completed tasks = 12287], outboundChannelpool size = 0, active 
threads = 0, queued tasks = 0, completed tasks = 4225], sockJsScheduler[pool 
size = 1, active threads = 1, …
Run Code Online (Sandbox Code Playgroud)

epoll stomp netty spring-websocket reactor-netty

10
推荐指数
1
解决办法
1387
查看次数

何时使用 Mono<List<Object>> 以及何时使用 Flux<Object> 作为 RestController 方法

我正在将 Spring web-flux 与 Reactor 一起使用,但我不清楚 RestController 方法何时应该返回

Mono <List<Object>>Flux<Object>.

你能提供一些什么时候使用它们的例子吗?

spring spring-boot project-reactor reactor-netty spring-webflux

10
推荐指数
1
解决办法
4605
查看次数

由于我的基本 URI 不固定,在 Webflux 中一次又一次地创建 Webclient 是否明智?

在我的微服务中,我必须从地方获取数据。有些 URL 是固定的,但有些不是。如果我的基本 URL 发生变化,我是否需要一次又一次地创建 Webclient。如果不是,那么下面的方法是正确的来创建 Web 客户端。WebClient.create(); 后来每当我打电话时都会一次又一次地更改 URI。根据我的理解,创建 WebClient 必须是一些繁重的操作。

ReactorClientHttpConnector connector = new ReactorClientHttpConnector(
                options -> options.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, requestTimeout).compression(true)
                        .afterNettyContextInit(ctx ->  ctx.addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS))));
        return WebClient.builder()
                        .clientConnector(connector)
                        .baseUrl(hostURL)
                        .build();
Run Code Online (Sandbox Code Playgroud)

project-reactor reactor-netty spring-webflux

10
推荐指数
1
解决办法
4227
查看次数

Zip 三种不同类型的单声道

我已经使用 Spring Webflux 开始了一个新项目,我对这种反应式编码范式还很陌生。所以对于像新手一样的提问提前道歉。

我的控制器方法返回响应Mono<ResponseEntity<String>>,我有三个不同的服务可以调用,从那里我得到三个不同的Mono对象 -

Mono<CustomObject> customMono = serivce1.method();
Mono<Boolean> booleanMono = service2.method();
Mono<String> stringMono = service3.method();
Run Code Online (Sandbox Code Playgroud)

所以为了准备响应(Mono<ResponseEntity<String>>),我需要做这样的事情 -

Mono.zip(customMono, booleanMono, stringMono, (customData, booleanData, stringData) -> {
------
return Mono.just(ResponseEntity.ok().body("-----"));
});
Run Code Online (Sandbox Code Playgroud)

问题是,没有这样的zip方法将 3Mono和一个函数作为参数。我已经找到了这个 - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#zip-reactor.core.publisher.Mono-reactor.core.publisher.Mono-java .util.function.BiFunction-

但它不能满足我的要求。所以我面临的问题

  • 我不能使用Mono.mergeWithMono.concaWith方法,因为我的 Mono 对象是不同类型的。
  • 我可以 flatMap/map 每个 Mono 并创建一个链。但我希望在单独的线程中并行调用 service2,因为它不依赖于 service1 调用。但是 service3 调用依赖于 service1 响应。

总之,我需要做的是:

  • 首先拨打 service1 电话
  • 在不同的线程上分别调用 service2
  • 进行依赖于 service1 呼叫数据的 service3 呼叫
  • Mono<ResponseEntity<String>>使用来自所有服务调用的数据生成最终响应对象 …

java reactor-netty spring-webflux

9
推荐指数
2
解决办法
1万
查看次数

Netty HttpClient - 响应超时与读取超时

HttpClientfrom 提供了各种要配置的“超时”,其中两个让我有点困惑:

  • 响应超时:This is time that takes to receive a response after sending a request
  • 读取超时处理程序:Raises a ReadTimeoutException when no data was read within a certain period of time

有人可以解释一下它们之间的主要区别吗?


通用场景 - 使用 spring reactive 进行调用,它在幕后WebClient使用:HttpClient

  1. 建立与远程服务器的连接>>>在这里我们利用ChannelOption.CONNECT_TIMEOUT_MILLIS
  2. TLS 握手>>> 这里我们利用,ReadTimeoutHandler我是对的吗?
  3. 发送请求
  4. ...等待...
  5. 接收响应>>> 此处哪个“超时”优先:响应/读取?

netty reactor-netty spring-webflux

9
推荐指数
1
解决办法
1万
查看次数

如何使反应式webclient遵循3XX重定向?

我创建了一个基本的REST控制器,它使用netty在Spring-boot 2中使用被动Webclient进行请求.

@RestController
@RequestMapping("/test")
@Log4j2
public class TestController {

    private WebClient client;

    @PostConstruct
    public void setup() {

        client = WebClient.builder()
                .baseUrl("http://www.google.com/")
                .exchangeStrategies(ExchangeStrategies.withDefaults())
                .build();
    }


    @GetMapping
    public Mono<String> hello() throws URISyntaxException {
        return client.get().retrieve().bodyToMono(String.class);
    }

}
Run Code Online (Sandbox Code Playgroud)

当我收到3XX响应代码时,我希望webclient使用响应中的Location来跟踪重定向,并递归调用该URI,直到我得到非3XX响应.

我得到的实际结果是3XX响应.

spring-boot project-reactor reactor-netty spring-webflux

8
推荐指数
2
解决办法
2517
查看次数

使用 Reactor Netty HTTP Client 时如何获取 HTTP 响应正文和状态

我在这里使用 Reactor Netty HTTP 客户端作为独立依赖项,即不是通过,spring-webflux因为我不想拖入 Spring 相关依赖项

从文档中可以看出,可以发出返回的请求HttpClientResponse

import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientResponse;

public class Application {

    public static void main(String[] args) {
        HttpClientResponse response =
                HttpClient.create()                   
                          .get()                      
                          .uri("http://example.com/") 
                          .response()                 
                          .block();
    }
}
Run Code Online (Sandbox Code Playgroud)

事情HttpClientResponse只包含标题和状态。从这里的 Java 文档可以看出

也可以从示例中消耗数据来做到这一点

import reactor.netty.http.client.HttpClient;

public class Application {

    public static void main(String[] args) {
        String response =
                HttpClient.create()
                          .get()
                          .uri("http://example.com/")
                          .responseContent() 
                          .aggregate()       
                          .asString()        
                          .block();
    }
}
Run Code Online (Sandbox Code Playgroud)

但这仅以字符串形式返回 http 实体数据。没有有关标头或状态代码的信息。

我现在遇到的问题是我需要发出请求并获得响应,该响应为我提供标头、状态等以及 http 响应正文。

我似乎无法找到如何。有什么想法吗?qw

reactor project-reactor reactor-netty

7
推荐指数
1
解决办法
5142
查看次数