Rom*_*n Y 4 java scheduler flux project-reactor
我有一个 s 原语Flux,String并在main()方法中运行此代码。
package com.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;
import java.util.Arrays;
import java.util.List;
public class Parallel {
private static final Logger log = Loggers.getLogger(Parallel.class.getName());
private static List<String> COLORS = Arrays.asList("red", "white", "blue");
public static void main(String[] args) throws InterruptedException {
Flux<String> flux = Flux.fromIterable(COLORS);
flux
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.newParallel("sub"))
.publishOn(Schedulers.newParallel("pub", 1))
.subscribe(value -> {
log.info("==============Consumed: " + value);
});
}
}
Run Code Online (Sandbox Code Playgroud)
如果您尝试运行此代码,应用程序永远不会停止运行,您需要手动停止它。如果我替换.newParallel()为.parallel()一切正常,并且应用程序正常完成。
为什么它无法自行完成运行?为什么会挂?这种行为的原因是什么?
如果您将此代码作为 JUnit 测试运行,它可以正常工作并且不会挂起。
Scheduler您自己使用工厂方法创建的实例默认以非守护程序newXxx模式创建,这意味着它可以防止 JVM 退出。
System.exit()当所有测试都运行时,JUnit 会调用,这解释了为什么测试场景不会挂起。
在这种情况下,Schedulers.newSingle()和Schedulers.newParallel()变体是最严重的“罪犯”,因为创建的线程在不活动超时后不会被剔除,这与Schedulers.newBoundedElastic().
如果在现实场景中,您有一个明确定义的应用程序生命周期,您可以将实例存储Scheduler在某处(例如,作为 bean),并确保Scheduler#dispose()在应用程序生命周期结束时调用每个实例。
更简单的解决方案:使用相关工厂重载Schedulers显式创建。daemon == true
| 归档时间: |
|
| 查看次数: |
691 次 |
| 最近记录: |