Luk*_*asz 24 spring reactive-programming
Spring 5为webflux引入了rest API的反应式编程风格.我自己也很陌生,并且想知道将数据库的同步调用包装成有意义Flux
还是Mono
有意义?如果是,这是这样做的方式:
@RestController
public class HomeController {
private MeasurementRepository repository;
public HomeController(MeasurementRepository repository){
this.repository = repository;
}
@GetMapping(value = "/v1/measurements")
public Flux<Measurement> getMeasurements() {
return Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L)));
}
}
Run Code Online (Sandbox Code Playgroud)
是否有类似异步CrudRepository的东西?我找不到它.
Gry*_*har 27
一种选择是使用完全非阻塞的备用SQL客户端.一些示例包括:https: //github.com/mauricio/postgresql-async或https://github.com/finagle/roc.当然,数据库供应商尚未正式支持这些驱动程序.此外,与基于JDBC的成熟抽象(如Hibernate或jOOQ)相比,功能的吸引力要小得多.
从Scala世界来到我的另一个想法.我们的想法是将阻塞调用分配到隔离的ThreadPool中,而不是将阻塞和非阻塞调用混合在一起.这将允许我们控制线程的总数,并让CPU在主执行上下文中提供非阻塞任务,并进行一些潜在的优化.假设我们有基于JDBC的实现,例如确实阻塞的Spring Data JPA,我们可以使它的执行异步并在专用线程池上调度.
@RestController
public class HomeController {
private final MeasurementRepository repository;
private final Scheduler scheduler;
public HomeController(MeasurementRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) {
this.repository = repository;
this.scheduler = scheduler;
}
@GetMapping(value = "/v1/measurements")
public Flux<Measurement> getMeasurements() {
return Mono.fromCallable(() -> repository.findByFromDateGreaterThanEqual(new Date(1486980000L))).publishOn(scheduler);
}
}
Run Code Online (Sandbox Code Playgroud)
我们的JDBC调度程序应该使用专用线程池进行配置,其中大小计数等于连接数.
@Configuration
public class SchedulerConfiguration {
private final Integer connectionPoolSize;
public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) {
this.connectionPoolSize = connectionPoolSize;
}
@Bean
public Scheduler jdbcScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
}
}
Run Code Online (Sandbox Code Playgroud)
但是,这种方法存在困难.主要是交易管理.在JDBC中,事务只能在单个java.sql.Connection中进行.要在一个事务中进行多个操作,他们必须共享连接.如果我们想在它们之间进行一些计算,我们必须保持连接.这不是很有效,因为我们在两者之间进行计算时保持有限数量的连接空闲.
这种异步JDBC包装器的想法并不新鲜,并且已在Scala库Slick 3中实现.最后,非阻塞JDBC可能会出现在Java路线图中.正如它在2016年9月在JavaOne上宣布的那样,我们可能会在Java 10中看到它.
根据此博客,您应该按照以下方式重写代码段
@GetMapping(value = "/v1/measurements")
public Flux<Measurement> getMeasurements() {
return Flux.defer(() -> Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))))
.subscribeOn(Schedulers.elastic());
}
Run Code Online (Sandbox Code Playgroud)
Spring数据支持Mongo和Cassandra的反应式存储库接口.
Spring数据MongoDb Reactive Interface
Spring Data MongoDB通过Project Reactor和RxJava 1反应类型提供反应式存储库支持.反应API支持反应类型之间的反应型转换.
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> {
Flux<Person> findByLastname(String lastname);
@Query("{ 'firstname': ?0, 'lastname': ?1}")
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
// Accept parameter inside a reactive type for deferred execution
Flux<Person> findByLastname(Mono<String> lastname);
Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname);
@InfiniteStream // Use a tailable cursor
Flux<Person> findWithTailableCursorBy();
}
public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> {
Observable<Person> findByLastname(String lastname);
@Query("{ 'firstname': ?0, 'lastname': ?1}")
Single<Person> findByFirstnameAndLastname(String firstname, String lastname);
// Accept parameter inside a reactive type for deferred execution
Observable<Person> findByLastname(Single<String> lastname);
Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname);
@InfiniteStream // Use a tailable cursor
Observable<Person> findWithTailableCursorBy();
}
Run Code Online (Sandbox Code Playgroud)
获得 Flux 或 Mono 并不一定意味着它会在专用线程中运行。相反,大多数运算符继续在前一个运算符执行的线程中工作。除非指定,否则最顶层的运算符(源)本身运行在进行 subscribe() 调用的线程上。
如果您要使用阻塞持久性 API(JPA、JDBC)或网络 API,那么 Spring MVC 至少是通用架构的最佳选择。Reactor 和 RxJava 在单独的线程上执行阻塞调用在技术上是可行的,但您不会充分利用非阻塞 Web 堆栈。
所以......我如何包装一个同步的、阻塞的调用?
使用Callable
到延迟执行。您应该使用Schedulers.elastic
它,因为它创建了一个专用线程来等待阻塞资源而不占用其他资源。
例子:
Mono.fromCallable(() -> blockingRepository.save())
.subscribeOn(Schedulers.elastic());
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
18119 次 |
最近记录: |