标签: project-reactor

如何延迟重复的 WebClient 获取请求

我正在使用 Spring 5 WebClient 从 REST api 重复获取正在运行的进程的某些状态。

这里的帮助下,我现在来到了这个解决方案:

webClient.get().uri(...).retrieve.bodyToMono(State.class)
          .repeat()
          .skipUntil(state -> stateFinished())
          .limitRequest(1)
          .subscribe(state -> {...});
Run Code Online (Sandbox Code Playgroud)

虽然这有效,但 get 请求以非常高的速率被触发。将请求速率限制为每秒 1 个请求的正确方法是什么?

我尝试使用,delayElements(Duration.ofSeconds(1))但这只会延迟结果,而不是请求本身。

project-reactor spring-webflux

1
推荐指数
1
解决办法
4376
查看次数

弹性4J + Spring Boot 2.x

我在reative API spring boot应用程序中使用resilience4j进行容错。我可以看到,即使 Mono 返回错误,所有事件都被视为成功。

服务层

    @Override
    @CircuitBreaker(name = "member-service")
    public Mono<String> getMemberInfo(String memberId) {
        return wsClient.getMemberInfo(memberId); 
       // This call will return WSException whenever there is 4XX or 5XX error
    }
Run Code Online (Sandbox Code Playgroud)

application.yml配置

resilience4j.circuitbreaker:
  backends:
    member-service:
      ringBufferSizeInClosedState: 1
      ringBufferSizeInHalfOpenState: 2
      waitInterval: 10000
      failureRateThreshold: 75
      registerHealthIndicator: true
      recordExceptions:
        - com.xx.WSException
      ignoreExceptions:
        - com.xxx.WSClientException
Run Code Online (Sandbox Code Playgroud)

我故意更改了 URI 路径,以便 WebClient 始终返回 404 错误,从而引发 WSException。当我看到下面的端点时,类型总是成功。我错过了什么?

http://localhost:8088/circuitbreaker-events/member-service

{
"circuitBreakerEvents": [
{
"circuitBreakerName": "member-service",
"type": "SUCCESS",
"creationTime": "2019-02-18T21:56:21.588+05:30[Asia/Calcutta]",
"durationInMs": 6
},
{
"circuitBreakerName": "member-service",
"type": "SUCCESS", …
Run Code Online (Sandbox Code Playgroud)

circuit-breaker project-reactor spring-webflux resilience4j

1
推荐指数
1
解决办法
4636
查看次数

使用 Spring 5 下载 PDF 文件时出错

我尝试使用 Spring 5 下载 PDF 文件。以下是我的代码:

@RequestMapping(path = "/pdf", method = { RequestMethod.POST }, produces = MediaType.APPLICATION_PDF_VALUE)
public Mono<ResponseEntity<Resource>> getPDF(ServerHttpRequest httpRequest) 
{
    File file = new File(filepath);
    ResponseEntity<Resource> resource = getResource(file);
    return Mono.justOrEmpty(resource);
}

public ResponseEntity<Resource> getResource(File file) {
   final InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
   HttpHeaders headers = new HttpHeaders();
   headers.add(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=" + file.getName());
   headers.add("Cache-Control", "no-cache, no-store, must-revalidate");
   headers.add("Pragma", "no-cache");
   headers.add("Expires", "0");
   return ResponseEntity.ok().headers(headers).contentType(MediaType.APPLICATION_PDF).contentLength(file.length()).body(new InputStreamResource(inputStream));
}
Run Code Online (Sandbox Code Playgroud)

但我收到以下异常:

java.lang.NoSuchMethodError: reactor.core.publisher.Flux.doOnDiscard(Ljava/lang/Class;Ljava/util/function/Consumer;)Lreactor/core/publisher/Flux;

在 org.springframework.core.io.buffer.DataBufferUtils.readByteChannel(DataBufferUtils.java:105) 在 org.springframework.core.io.buffer.DataBufferUtils.read(DataBufferUtils.java:202) 在 org.springframework.core.io .buffer.DataBufferUtils.read(DataBufferUtils.java:170) at org.springframework.core.codec.ResourceEncoder.encode(ResourceEncoder.java:76) at …

java spring reactive-programming project-reactor

1
推荐指数
1
解决办法
201
查看次数

在 Vert.x 应用程序中使用 Project Reactor

我在 Vert.x 应用程序中使用了一个库,它返回Project Reactor类型Mono

我有一个 Verticle 接收这种反应类型,并打算通过事件总线将内容发送到另一个 Verticle:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

public class HelperVerticle extends AbstractVerticle
{
    public static final String ADDRESS = "address_1";

    @Override
    public void start() throws Exception
    {
        vertx.eventBus().consumer(ADDRESS, this::consume);
    }

    private void consume(Message<Object> message)
    {
        Mono.delay(Duration.ofMillis(3000)) 
            .thenReturn("Content of Mono.") // this would come from external library
            .publishOn(Schedulers.fromExecutor(vertx.nettyEventLoopGroup())) // is this needed?
            .subscribe(output ->
            {
                System.out.println("My verticle: " + Thread.currentThread().getName());
                message.reply(output + " " + message.body()); …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming vert.x project-reactor reactive-streams

1
推荐指数
1
解决办法
1004
查看次数

如何在 WebClient 上记录“真实”的 Http 标头

我目前正在使用ExchangeFilterFunction记录进入ClientRequest实例内的所有标头,我正在访问它们做request.headers().

执行我的过滤器后,HttpClient下面会添加某些标头,例如那个标头Accept-Encoding,因此不会被记录,因为它们永远不会被添加到ClientRequest实例中。

我的过滤器看起来像这样:

public class WebClientLoggingFilter implements ExchangeFilterFunction {

    @Override
    public Mono<ClientResponse> filter(final ClientRequest clientRequest, final ExchangeFunction next) {
         return Mono.just(clientRequest)
               .doOnNext(request -> log(request.headers().toString()))
               .flatMap(next::exchange)
               .doOnNext(clientResponse -> logData(clientRequestData, message, clientResponse));
  } 
}
Run Code Online (Sandbox Code Playgroud)

此过滤器将所有内容记录在ClientRequest标头中,但随后HttpClient会发挥其魔力,即使在响应返回之后,它也永远不会到达 ClientRequest。来自 Netty的代码示例

有没有其他方法可以进行日志记录,以便我可以访问真正通过网络发送的内容?

spring netty project-reactor reactor-netty spring-webflux

1
推荐指数
1
解决办法
934
查看次数

在使用 Spring Project Reactor 延迟背压后重试?

背景

我正在尝试使用Spring Project Reactor 3.3.0 版实现类似于简单的非阻塞速率限制器的东西。例如,要将数量限制为每秒 100 个请求,我使用以下实现:

myFlux
      .bufferTimeout(100, Duration.ofSeconds(1))
      .delayElements(Duration.ofSeconds(1))
      ..
Run Code Online (Sandbox Code Playgroud)

这适用于我的用例,但如果订阅者没有跟上myFlux发布者的速度,它会(正确地)抛出OverflowException

reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxLift] :
    reactor.core.publisher.Flux.bufferTimeout(Flux.java:2780)
Run Code Online (Sandbox Code Playgroud)

在我的情况下,重要的是所有元素都被订阅者消耗,因此例如降低背压 ( onBackpressureDrop()) 是不可接受的。

有没有办法,而不是在背压下丢弃元素,只是暂停消息的发布,直到订阅者赶上?在我的情况下myFlux,发布了一个有限但大量的元素,这些元素持久存在于持久数据库中,因此恕我直言,不应该要求删除元素。

spring backpressure spring-boot project-reactor

1
推荐指数
1
解决办法
601
查看次数

我们如何迭代和打印来自 Reactor Flux 或 Mono FlatMap 或 FlatMapMany 的值?

使用 Spring Boot 学习 Reactor。

使用示例 API:

https://jsonplaceholder.typicode.com/todos/1
{
  "userId": 1,
  "id": 1,
  "title": "delectus aut autem",
  "completed": false
}
Run Code Online (Sandbox Code Playgroud)

想要将上面的映射到一个对象(定义一个 pojo SingleUser)并打印输出。

private WebClient webClient = WebClient.create("https://jsonplaceholder.typicode.com");

private Mono<ClientResponse> responseMono = webClient.get()
          .uri("/todos/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange();

public String getResult() {
   return ">> result = " + responseMono.flatMap(res -> res.bodyToMono(String.class)).block();
}
Run Code Online (Sandbox Code Playgroud)

使用上述内容时..结果是:

>> result = {
  "userId": 1,
  "id": 1,
  "title": "delectus aut autem",
  "completed": false
}
Run Code Online (Sandbox Code Playgroud)

使用 Flux 时如何迭代和打印所有值,如下所示?

public Flux<SingleUser> listUsers1() {
    return webClient.get()
             .uri("/todos/1")
             .retrieve()
             .bodyToFlux(SingleUser.class);
} …
Run Code Online (Sandbox Code Playgroud)

spring spring-boot project-reactor

1
推荐指数
1
解决办法
3229
查看次数

如何避免 .flatMap(x-&gt;reactiveAction(x).thenReturn(x))

在使用项目反应器库的 Java 响应式编程期间,我偶然发现了一种模式,我想知道是否有开箱即用的支持?

所以我想要下面的代码:

Mono.just("hello")
    .flatMap(hello -> reactiveAction(hello).thenReturn(hello))
    ..
    .;
Run Code Online (Sandbox Code Playgroud)

变成类似的东西:

Mono.just("hello")
    .coolOperation(this::reactiveAction)
    ..
    .;   
Run Code Online (Sandbox Code Playgroud)

我不能使用 doOnNext 因为我想在 reactAction 中做的不是副作用。和反应动作是:

Mono<Integer> reactiveAction(String text){
  return ....
}
Run Code Online (Sandbox Code Playgroud)

java mono publisher reactive-programming project-reactor

1
推荐指数
1
解决办法
288
查看次数

如何控制 Flux.flatMap (Mono) 的并行度?

下面的代码并行执行所有 Web 请求 (webClient),不考虑我输入的限制parallel(5)

        Flux.fromIterable(dataListWithHundredsElements)
            .parallel(5).runOn(Schedulers.boundedElastic())
            .flatMap(element -> 
                webClient.post().
                .bodyValue(element)
                .retrieve()
                .bodyToMono(String.class)
                .doOnError(err -> element.setError(Utils.toString(err)))
                .doOnSuccess(r -> element.setResponse(r))
            )
            .sequential()
            .onErrorContinue((e, v) -> {})
            .doOnComplete(() -> updateInDatabase(dataListWithHundresdElements))
            .subscribe();
Run Code Online (Sandbox Code Playgroud)

我想知道是否可以根据中指定的值执行请求parallel(5)以及如何最好地执行请求?

一个细节,此代码是一个 Spring MVC 应用程序,我正在向外部服务发出请求。

更新 01

实际上 Flux 创建了 5 个线程,但是,所有请求(WebClient Mono)都是同时执行的。

我想要的是一次执行 5 个请求,所以当 1 个请求结束时,另一个请求会启动,但任何时候都不应该有 5 个以上的并行请求。

由于 Mono 也是一种响应式类型,在我看来 Flux 的 5 个线程调用它并且没有被阻塞,实际上发生的情况是所有请求都是并行发生的。

更新 02 - 外部服务日志

这是外部服务的日志,大约需要 5 秒才能响应。正如您在下面的日志中看到的,同时有 14 个请求。

2020-05-08 11:53:56.655  INFO 28223 --- [nio-8080-exec-8] EXTERNAL SERVICE LOG {"id": 21} http-nio-8080-exec-8 …
Run Code Online (Sandbox Code Playgroud)

spring spring-boot project-reactor spring-webflux

1
推荐指数
1
解决办法
1346
查看次数

Reactor Flux flatMap 算子吞吐量/并发控制并实现背压

我正在使用 Flux 来构建我的反应式管道。在管道中,我需要调用 3 个不同的外部系统 REST API,它们的访问速率非常严格。如果我违反了每秒速率阈值,我将受到指数级的限制。每个系统都有自己的阈值。

我正在使用 Spring WebClient 进行 REST API 调用;在 3 个 API 中,其中 2 个是 GET,1 个是 POST。

在我的反应器管道中,WebClient 被包裹在 flatMap 中以执行 API 调用,如下面的代码:

WebClient getApiCall1 = WebClient.builder().build().get("api-system-1").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall2 = WebClient.builder().build().get("api-system-2").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall3 = WebClient.builder().build().get("api-system-3").retrieve().bodyToMono(String.class) //actual return DTO is different from string

    Flux.generator(generator) // Generator pushes the elements from source 1 at a time

    // make call to 1st API …
Run Code Online (Sandbox Code Playgroud)

spring-boot project-reactor reactive-streams spring-webflux spring-webclient

1
推荐指数
1
解决办法
1014
查看次数