lok*_*ely 0 reactive-programming reactive-streams spring-webflux
我正在学习反应堆核心并关注这个https://www.baeldung.com/reactor-core
ArrayList<Integer> arrList = new ArrayList<Integer>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(arrList::add);
System.out.println("After: " + arrList);
Run Code Online (Sandbox Code Playgroud)
当我执行上面的代码行时,给出。
Before: []
[DEBUG] (main) Using Console logging
After: []
Run Code Online (Sandbox Code Playgroud)
上面的代码行应该在另一个线程中开始执行,但它根本不起作用。有人可以帮我吗?
正如 Reactor 文档中提到的各种subscribe方法:
请记住,由于序列可以是异步的,这将立即将控制权返回给调用线程。这会给人一种印象,例如在主线程或单元测试中执行时不会调用使用者。
这意味着到达 main 方法的末尾,因此主线程在任何线程能够订阅反应链之前退出,正如 Piotr 所提到的。
您要做的是等到整个链完成后再打印数组的内容。
这样做的天真方法是:
ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.blockLast();
System.out.println("After: " + arrList);
Run Code Online (Sandbox Code Playgroud)
在这里,您在主线程上阻塞执行,直到处理完 Flux 上的最后一个元素。因此,在您的 ArrayList 完全填充之前,最后一个 System.out 不会执行。
请记住,代码在控制台应用程序中的运行方式与 Netty 等服务器环境中的运行方式略有不同。使控制台应用程序等待所有订阅启动的唯一方法是将block.
但是在并行线程上不允许阻塞。所以这种方法在 Netty 环境中不起作用。您的服务器将在那里运行,直到明确关闭,所以没问题subscribe。
但是,在上面的代码片段中,您阻止不仅是为了防止应用程序退出,而且是为了在读取已填充的数据之前等待。
对上述代码的改进如下:
ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.doOnComplete(() -> System.out.println("After: " + arrList))
.blockLast();
Run Code Online (Sandbox Code Playgroud)
即使在这里,doOnComplete从反应链外部访问数据。为了防止这种情况,您可以在链本身中收集 Flux 的元素,如下所示:
System.out.println("Before.");
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.collectList()
.doOnSuccess(list -> System.out.println("After: " + list))
.block();
Run Code Online (Sandbox Code Playgroud)
再次记住,当在 Netty 中运行时(比如 Spring Webflux 应用程序),上面的代码将以subscribe().
但是请注意,从 Flux 切换到列表(或任何集合)意味着您正在从反应式范式切换到命令式编程。您应该能够在 Reactive 范式本身内实现任何功能。
我认为有些混乱。当你打电话时subscribeOn(Schedulers.parallel())。您指定要在不同线程上接收项目。此外,您必须减慢代码速度,以便订阅 cen 真正启动(这就是我添加的原因Thread.sleep(100))。如果您运行我通过的代码,它就可以工作。你看反应堆中没有神奇的同步机制。
ArrayList<Integer> arrList = new ArrayList<Integer>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(
t -> {
System.out.println(t + " thread id: " + Thread.currentThread().getId());
arrList.add(t);
}
);
System.out.println("size of arrList(before the wait): " + arrList.size());
System.out.println("Thread id: "+ Thread.currentThread().getId() + ": id of main thread ");
Thread.sleep(100);
System.out.println("size of arrList(after the wait): " + arrList.size());
Run Code Online (Sandbox Code Playgroud)
如果您想将您的项目添加到并行反应器列表中,则不是一个好的选择。最好在 Java 8 中使用并行流。
List<Integer> collect = Stream.of(1, 2, 3, 4)
.parallel()
.map(i -> i * 2)
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
您发布的那个教程在并发部分方面不是很精确。作者相信他/她说更多的文章即将到来。但我认为根本不应该发布那个特定的例子,因为它会造成混乱。我建议不要太相信互联网上的资源:)
| 归档时间: |
|
| 查看次数: |
2835 次 |
| 最近记录: |