ale*_*leh 4 java reactive-programming project-reactor
我可能错过了一些东西,但我无法弄清楚它是什么.
以下代码什么都不做:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.subscribe();
Run Code Online (Sandbox Code Playgroud)
如果我试图阻止呼叫它工作正常:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.block();
Run Code Online (Sandbox Code Playgroud)
奇怪的是,如果我"手动"创建一个Flux(即不是来自spring webClient),这很好用:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.doOnNext(System.out::println)
.subscribe();
Run Code Online (Sandbox Code Playgroud)
有人可以解释一下我做错了什么吗?是不是.subscribe()应该在第一种情况下执行操作,就像它在最后一样?
谢谢!
简答
subscribe不阻止当前线程,这意味着应用程序主线程可以比Flux发出任何元素更早完成.所以要么block在主线程中使用或使用等待.
细节
调用no-args subscribe()只是request(unbounded)在Flux没有设置的情况下启用Subscriber.它通常在单独的线程中触发操作,但不阻止当前线程.最有可能的是,您的主线程在WebClient收到该单独线程中的响应并且发生被动副作用doOnNext(...)之前结束.
为了说明/测试操作是否已开始,请在主线程中等待一段时间.通过subscribe()电话后立即拨打以下一行:
Thread.sleep(1000);
Run Code Online (Sandbox Code Playgroud)
现在,在播放超时值后,您将能够看到打印结果.
现在让我们隐式地Scheduler为异步操作发送一个自定义,并等待其所有任务完成.另外,让我们传递System.out::printlnas subscribe(...)参数而不是doOnNext,以便完整代码如下所示:
ExecutorService executor = Executors.newSingleThreadExecutor();
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
.subscribe(System.out::println); //still non-blocking
executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread
Run Code Online (Sandbox Code Playgroud)
此示例使用略有不同的订阅(Consumer).最重要的是,它增加了publishOn(调度),这是由支持ExecutorService.然后使用后者在主线程中等待终止.
当然,更容易实现相同结果的方法是使用block()最初提到的:
Run Code Online (Sandbox Code Playgroud)webClient.get().uri("/some/path/here").retrieve() .bodyToMono(GetLocationsResponse.class) .doOnNext(System.out::println) .block();
最后,注意你的第三个例子Flux.just(...)...subscribe()- 似乎它在你的主线程终止之前很快完成.这是因为String与单个GetLocationsResponse元素的发射相比,它需要更少的时间来发出一些元素(这意味着写入请求的时间+读取响应+解析到POJO中).但是,如果你使这个Flux延迟元素,你会得到相同的行为:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
.doOnNext(System.out::println)
.subscribe();
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500))
.doOnNext(System.out::println)
.blockLast(); //and that makes it printing back again
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1103 次 |
| 最近记录: |