Spring webflux和从数据库中读取

Luk*_*asz 24 spring reactive-programming

Spring 5为webflux引入了rest API的反应式编程风格.我自己也很陌生,并且想知道将数据库的同步调用包装成有意义Flux还是Mono有意义?如果是,这是这样做的方式:

@RestController
public class HomeController {

    private MeasurementRepository repository;

    public HomeController(MeasurementRepository repository){
        this.repository = repository;
    }

    @GetMapping(value = "/v1/measurements")
    public Flux<Measurement> getMeasurements() {
        return Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L)));
    }

}
Run Code Online (Sandbox Code Playgroud)

是否有类似异步CrudRepository的东西?我找不到它.

Gry*_*har 27

一种选择是使用完全非阻塞的备用SQL客户端.一些示例包括:https: //github.com/mauricio/postgresql-asynchttps://github.com/finagle/roc.当然,数据库供应商尚未正式支持这些驱动程序.此外,与基于JDBC的成熟抽象(如Hibernate或jOOQ)相比,功能的吸引力要小得多.

从Scala世界来到我的另一个想法.我们的想法是将阻塞调用分配到隔离的ThreadPool中,而不是将阻塞和非阻塞调用混合在一起.这将允许我们控制线程的总数,并让CPU在主执行上下文中提供非阻塞任务,并进行一些潜在的优化.假设我们有基于JDBC的实现,例如确实阻塞的Spring Data JPA,我们可以使它的执行异步并在专用线程池上调度.

@RestController
public class HomeController {

    private final MeasurementRepository repository;
    private final Scheduler scheduler;

    public HomeController(MeasurementRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) {
        this.repository = repository;
        this.scheduler = scheduler;
    }

    @GetMapping(value = "/v1/measurements")
    public Flux<Measurement> getMeasurements() {
        return Mono.fromCallable(() -> repository.findByFromDateGreaterThanEqual(new Date(1486980000L))).publishOn(scheduler);
    }

}
Run Code Online (Sandbox Code Playgroud)

我们的JDBC调度程序应该使用专用线程池进行配置,其中大小计数等于连接数.

@Configuration
public class SchedulerConfiguration {
    private final Integer connectionPoolSize;

    public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) {
        this.connectionPoolSize = connectionPoolSize;
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

}
Run Code Online (Sandbox Code Playgroud)

但是,这种方法存在困难.主要是交易管理.在JDBC中,事务只能在单个java.sql.Connection中进行.要在一个事务中进行多个操作,他们必须共享连接.如果我们想在它们之间进行一些计算,我们必须保持连接.这不是很有效,因为我们在两者之间进行计算时保持有限数量的连接空闲.

这种异步JDBC包装器的想法并不新鲜,并且已在Scala库Slick 3中实现.最后,非阻塞JDBC可能会出现在Java路线图中.正如它在2016年9月在JavaOne上宣布的那样,我们可能会在Java 10中看到它.

  • 在代码示例中,publishOn(scheduler)调用对我来说很奇怪.请转发给我们,而不是订阅哪个在给定的调度程序上运行请求()给这个发布者? (4认同)
  • 它应该是使用线程池的subscribeOn()方法而不是上面代码所具有的publishOn(). (4认同)

Dmy*_*nko 8

根据此博客,您应该按照以下方式重写代码段

@GetMapping(value = "/v1/measurements")
public Flux<Measurement> getMeasurements() {
    return Flux.defer(() -> Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))))
           .subscribeOn(Schedulers.elastic());
}
Run Code Online (Sandbox Code Playgroud)


you*_*jad 5

Spring数据支持Mongo和Cassandra的反应式存储库接口.

Spring数据MongoDb Reactive Interface

Spring Data MongoDB通过Project Reactor和RxJava 1反应类型提供反应式存储库支持.反应API支持反应类型之间的反应型转换.

public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> {

    Flux<Person> findByLastname(String lastname);

    @Query("{ 'firstname': ?0, 'lastname': ?1}")
    Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);

    // Accept parameter inside a reactive type for deferred execution
    Flux<Person> findByLastname(Mono<String> lastname);

    Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname);

    @InfiniteStream // Use a tailable cursor
    Flux<Person> findWithTailableCursorBy();

}

public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> {

    Observable<Person> findByLastname(String lastname);

    @Query("{ 'firstname': ?0, 'lastname': ?1}")
    Single<Person> findByFirstnameAndLastname(String firstname, String lastname);

    // Accept parameter inside a reactive type for deferred execution
    Observable<Person> findByLastname(Single<String> lastname);

    Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname);

    @InfiniteStream // Use a tailable cursor
    Observable<Person> findWithTailableCursorBy();
}
Run Code Online (Sandbox Code Playgroud)


kkd*_*927 5

获得 Flux 或 Mono 并不一定意味着它会在专用线程中运行。相反,大多数运算符继续在前一个运算符执行的线程中工作。除非指定,否则最顶层的运算符(源)本身运行在进行 subscribe() 调用的线程上。

如果您要使用阻塞持久性 API(JPA、JDBC)或网络 API,那么 Spring MVC 至少是通用架构的最佳选择。Reactor 和 RxJava 在单独的线程上执行阻塞调用在技术上是可行的,但您不会充分利用非阻塞 Web 堆栈。

所以......我如何包装一个同步的、阻塞的调用?

使用Callable到延迟执行。您应该使用Schedulers.elastic它,因为它创建了一个专用线程来等待阻塞资源而不占用其他资源。

  • Schedulers.immediate() :当前线程。
  • Schedulers.single() :单个可重用线程。
  • Schedulers.newSingle() :每次调用的专用线程。
  • Schedulers.elastic() :弹性线程池。它根据需要创建新的工作池,并重用空闲的工作池。例如,这是 I/O 阻塞工作的不错选择。
  • Schedulers.parallel() :针对并行工作进行调整的固定工作线程池。

例子:

Mono.fromCallable(() -> blockingRepository.save())
        .subscribeOn(Schedulers.elastic());
Run Code Online (Sandbox Code Playgroud)