在Spring Webflux中执行阻止JDBC调用

abs*_*hit 5 java spring-boot project-reactor spring-webflux

我正在使用Spring Webflux和Spring Data jpa,并使用PostgreSql作为后端数据库。我不想在进行诸如find和的数据库调用时阻塞主线程save。为了达到同样的目的,我在Controller类和jdbcScheduler服务类中都有一个主调度程序。

我定义它们的方式是:

@Configuration
@EnableJpaAuditing
public class CommonConfig {

    @Value("${spring.datasource.hikari.maximum-pool-size}")
    int connectionPoolSize;

    @Bean
    public Scheduler scheduler() {
        return Schedulers.parallel();
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

    @Bean
    public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
        return new TransactionTemplate(transactionManager);
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,在服务层中进行获取/保存调用时,我做了:

    @Override
    public Mono<Config> getConfigByKey(String key) {
        return Mono.defer(
            () -> Mono.justOrEmpty(configRepository.findByKey(key)))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {
        return Flux
            .fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> addConfigs(List<Config> configList) {
        return Flux.fromIterable(configRepository.saveAll(configList))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }
Run Code Online (Sandbox Code Playgroud)

在控制器中,我这样做:

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {
        return configService.addConfigs(configs).collectList()
            .map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
            .subscribeOn(scheduler);
    }
Run Code Online (Sandbox Code Playgroud)

它是否正确?和/或有更好的方法吗?

我的理解是:

.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
Run Code Online (Sandbox Code Playgroud)

是该任务将在jdbcScheduler线程上运行,以后结果将在我的主并行上发布scheduler。这种理解正确吗?

Bri*_*zel 7

您对publishOn和的理解是正确的subscribeOn请参阅反应堆项目中有关这些操作员的参考文档)。

如果您调用阻塞库而不调度它在特定调度程序上工作,这些调用将阻塞少数可用线程之一(默认情况下,Netty 事件循环),并且您的应用程序将只能同时处理少数请求。

现在我不确定你想通过这样做来实现什么。

首先,并行调度程序是为 CPU 密集型任务设计的,这意味着您将拥有的任务很少,与 CPU 内核一样多(或更多)。在这种情况下,这就像将线程池大小设置为常规 Servlet 容器上的核心数。您的应用将无法处理大量并发请求。

即使您选择了更好的替代方案(如弹性调度程序),它仍然不如 Netty 事件循环,后者是 Spring WebFlux 中本机调度请求处理的地方。

如果您的最终目标是性能和可伸缩性,那么在反应式应用程序中包装阻塞调用的性能可能比您的常规 Servlet 容器更差。

您可以改为使用 Spring MVC 和:

  • 在处理阻塞库时使用通常的阻塞返回类型,比如 JPA
  • 当您未绑定到此类库时使用MonoFlux返回类型

这不会是非阻塞的,但这仍然是异步的,您将能够并行执行更多工作而无需处理复杂性。

  • 我完全不同意***如果您的最终目标是性能和可扩展性,那么将阻塞调用包装在反应式应用程序中可能会比常规 Servlet 容器表现更差***。在反应式应用程序中包装阻塞类仍然比基于线程的 Servlet 容器更好,因为您只在阻塞调用上使用基于线程的样式(线程池或 Servlet 容器)而不是所有内容。 (4认同)
  • 如果您省略“publishOn”,那么事情将在您在 subscribeon 中配置的调度程序上运行。所以不,netty 循环上不会再发生任何事情。M 建议使用 Spring MVC,因为它支持响应式返回类型。它可能比 WebFlux 包装阻塞调用的性能更好 (2认同)