f1d*_*ave 6 java project-reactor
我有一个spring-webflux API,它在服务层需要从使用JDBC的现有存储库中读取.
在完成了关于这个主题的阅读之后,我想将阻塞数据库调用的执行与其他非阻塞异步代码分开.
我已经定义了一个专用的jdbcScheduler:
@Bean
public Scheduler jdbcScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(maxPoolSize));
}
Run Code Online (Sandbox Code Playgroud)
以及使用它的AsyncWrapper实用程序:
@Component
public class AsyncJdbcWrapper {
private final Scheduler jdbcScheduler;
@Autowired
public AsyncJdbcWrapper(Scheduler jdbcScheduler) {
this.jdbcScheduler = jdbcScheduler;
}
public <T> Mono<T> async(Callable<T> callable) {
return Mono.fromCallable(callable)
.subscribeOn(jdbcScheduler)
.publishOn(Schedulers.parallel());
}
}
Run Code Online (Sandbox Code Playgroud)
然后用于包装jdbc调用,如下所示:
Mono<Integer> userIdMono = asyncWrapper.async(() -> userDao.getUserByUUID(request.getUserId()))
.map(userOption -> userOption.map(u -> u.getId())
.orElseThrow(() -> new IllegalArgumentException("Unable to find user with ID " + request.getUserId())));
Run Code Online (Sandbox Code Playgroud)
我有两个问题:
1)我是否正确地将阻塞调用的执行推送到另一组线程?对于这些东西相当新,我正在努力与subscribeOn()/ publishOn()的复杂性.
2)假设我想使用生成的单声道,例如调用带有userIdMono结果的API,将在哪个调度程序上执行?专门为jdbc调用创建的那个,或者反应堆通常在其中运行的主(?)线程?例如
userIdMono.map(id -> someApiClient.call(id));
Run Code Online (Sandbox Code Playgroud)
1)使用subscribeOn正确地将JDBC工作放在jdbcScheduler
2)都不是,结果了Callable-而在jdbcScheduler计算,是publishOn该parallel计划,所以你map会从一个线程中执行Schedulers.parallel()池(而不是霸占jdbcScheduler).