Flux未在Spring 5反应堆中订购

ale*_*leh 4 java reactive-programming project-reactor

我可能错过了一些东西,但我无法弄清楚它是什么.

以下代码什么都不做:

webClient.get().uri("/some/path/here").retrieve()
     .bodyToMono(GetLocationsResponse.class)
     .doOnNext(System.out::println)
     .subscribe();
Run Code Online (Sandbox Code Playgroud)

如果我试图阻止呼叫它工作正常:

webClient.get().uri("/some/path/here").retrieve()
      .bodyToMono(GetLocationsResponse.class)
      .doOnNext(System.out::println)
      .block();
Run Code Online (Sandbox Code Playgroud)

奇怪的是,如果我"手动"创建一个Flux(即不是来自spring webClient),这很好用:

Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .doOnNext(System.out::println)
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

有人可以解释一下我做错了什么吗?是不是.subscribe()应该在第一种情况下执行操作,就像它在最后一样?

谢谢!

Kos*_*tyn 9

简答

subscribe不阻止当前线程,这意味着应用程序主线程可以比Flux发出任何元素更早完成.所以要么block在主线程中使用或使用等待.

细节

调用no-args subscribe()只是request(unbounded)Flux没有设置的情况下启用Subscriber.它通常在单独的线程中触发操作,但不阻止当前线程.最有可能的是,您的主线程在WebClient收到该单独线程中的响应并且发生被动副作用doOnNext(...)之前结束.

为了说明/测试操作是否已开始,请在主线程中等待一段时间.通过subscribe()电话后立即拨打以下一行:

Thread.sleep(1000);
Run Code Online (Sandbox Code Playgroud)

现在,在播放超时值后,您将能够看到打印结果.

现在让我们隐式地Scheduler为异步操作发送一个自定义,并等待其所有任务完成.另外,让我们传递System.out::printlnas subscribe(...)参数而不是doOnNext,以便完整代码如下所示:

ExecutorService executor = Executors.newSingleThreadExecutor(); 

webClient.get().uri("/some/path/here").retrieve()
    .bodyToMono(GetLocationsResponse.class)
    .publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
    .subscribe(System.out::println); //still non-blocking

executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread 
Run Code Online (Sandbox Code Playgroud)

此示例使用略有不同的订阅(Consumer).最重要的是,它增加了publishOn(调度),这是由支持ExecutorService.然后使用后者在主线程中等待终止.

当然,更容易实现相同结果的方法是使用block()最初提到的:

webClient.get().uri("/some/path/here").retrieve()
      .bodyToMono(GetLocationsResponse.class)
      .doOnNext(System.out::println)
      .block();
Run Code Online (Sandbox Code Playgroud)

最后,注意你的第三个例子Flux.just(...)...subscribe()- 似乎它在你的主线程终止之前很快完成.这是因为String与单个GetLocationsResponse元素的发射相比,它需要更少的时间来发出一些元素(这意味着写入请求的时间+读取响应+解析到POJO中).但是,如果你使这个Flux延迟元素,你会得到相同的行为:

Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
    .doOnNext(System.out::println)
    .subscribe(); 


Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .delayElements(Duration.ofMillis(500))
    .doOnNext(System.out::println)
    .blockLast(); //and that makes it printing back again
Run Code Online (Sandbox Code Playgroud)