Liv*_*lea 7 java spring-boot project-reactor reactive
我目前正在netty和jOOQ上使用SpringBoot 2,spring-boot-starter-webflux开发一个应用程序.
下面是我经过数小时的研究和stackoverflow搜索后得出的代码.我已经内置了很多日志记录,以便查看在哪个线程上发生了什么.
UserController的:
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
return Mono.just(user)
.map(it -> {
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return it;
})
.map(userService::create)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}
Run Code Online (Sandbox Code Playgroud)
UserService:
public int create(ImUser user) {
return Mono.just(user)
.subscribeOn(Schedulers.elastic())
.map(u -> {
logger.debug("UserService thread: " + Thread.currentThread().getName());
return imUserDao.insertUser(u);
})
.block();
}
Run Code Online (Sandbox Code Playgroud)
userDAO的:
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public int insertUser(ImUser user) {
logger.debug("Insert DB on thread: " + Thread.currentThread().getName());
return dsl.insertInto(IM_USER,IM_USER.VERSION, IM_USER.FIRST_NAME, IM_USER.LAST_NAME, IM_USER.BIRTHDATE, IM_USER.GENDER)
.values(1, user.getFirstName(), user.getLastName(), user.getBirthdate(), user.getGender())
.returning(IM_USER.ID)
.fetchOne()
.getId();
}
Run Code Online (Sandbox Code Playgroud)
代码按预期工作,"接收请求"和"发送响应"都在同一个线程(reactor-http-server-epoll-x)上运行,而阻塞代码(对imUserDao.insertUser(u)的调用)运行在弹性调度程序线程(elastic-x).事务绑定到调用带注释的方法的线程(这是弹性-x),因此按预期工作(我已经使用不在此处发布的不同方法对其进行测试,以保持简单).
这是一个日志示例:
20:57:21,384 DEBUG admin.UserController| Receiving request on thread: reactor-http-server-epoll-7
20:57:21,387 DEBUG admin.UserService| UserService thread: elastic-2
20:57:21,391 DEBUG admin.ExtendedUserDao| Insert DB on thread: elastic-2
20:57:21,393 DEBUG tools.LoggerListener| Executing query
...
20:57:21,401 DEBUG tools.StopWatch| Finishing : Total: 9.355ms, +3.355ms
20:57:21,409 DEBUG admin.UserController| Sending response on thread: reactor-http-server-epoll-7
Run Code Online (Sandbox Code Playgroud)
我已经研究了反应式编程很长一段时间了,但从来没有完全对任何反应性程序进行编程.既然我是,我想知道我是否正确地做到了.所以这是我的问题:
1.上面的代码是处理传入HTTP请求,查询数据库然后响应的好方法吗?为了我的理智,请忽略我内置的logger.debug(...)调用:)我希望有一个Flux <ImUser>作为控制器方法的参数,就我所知多个潜在请求的流将在某个时刻到来,并且将以相同的方式处理.相反,我发现的例子创建了一个Mono.from(...); 每次请求进来.
2.在UserService(Mono.just(用户))中创建的第二个Mono 感觉有点尴尬.我知道我需要启动一个新流才能在弹性调度程序上运行代码,但是不是有运营商这样做吗?
3.从编写代码的方式来看,我理解UserService中的Mono将被阻塞,直到数据库操作完成,但不会阻止服务于请求的原始流.它是否正确?
4.我打算用并行调度程序替换Schedulers.elastic(),我可以在其中配置工作线程数.这个想法是最大工作线程的数量应该与最大数据库连接相同.当调度程序中的所有工作线程都忙时会发生什么?当背压跳进去的时候?
5.我最初希望在我的控制器中有这个代码:
return userService.create(user)
.map(it -> ResponseEntity.status(HttpStatus.CREATED).body(it))
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
Run Code Online (Sandbox Code Playgroud)
但我无法实现这一点并保持正确的线程运行.有没有办法在我的代码中实现这一点?
任何帮助将不胜感激.谢谢!
服务和控制器
您的服务阻塞的事实是有问题的,因为在控制器中您从 a 内部调用阻塞方法,map而该方法不会在单独的线程上移动。这有可能阻止所有控制器。
相反,您可以做的是 return a Monofrom UserService#create(删除block()最后的)。由于服务保证了Dao方法调用是隔离的,所以问题较少。从那里开始,不需要Mono.just(user)在控制器中执行任何操作:只需直接在生成的 Mono 上调用 create 和 start 链接运算符即可:
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
//this log as you saw was executed in the same thread as the controller method
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return userService.create(user)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}
Run Code Online (Sandbox Code Playgroud)
请
注意,如果您想记录某些内容,有比执行 amap并返回更好的选择it:
doOnNext方法是为此量身定制的:对反应信号之一做出反应(在本例中,onNext:发出一个值)并执行一些非变异操作,使输出序列与源序列完全相同。doOn 的“副作用”可以是写入控制台或增加统计计数器,例如...还有 doOnComplete、doOnError、doOnSubscribe、doOnCancel 等...
log只需按照上面的顺序记录所有事件。它将检测您是否使用 SLF4J 并在 DEBUG 级别使用配置的记录器(如果是)。否则,它将使用 JDK 日志记录功能(因此您还需要对其进行配置以显示 DEBUG 级别日志)。
在反应式编程中,有关事务或任何依赖于ThreadLocal
ThreadLocal 和线程粘性的事物可能会出现问题,因为底层执行模型在整个序列中保持不变的保证较少。AFlux可以分几个步骤执行,每个步骤都在不同的Scheduler(以及线程或线程池)中。即使在特定步骤中,一个值也可以由底层线程池的线程 A 处理,而稍后到达的下一个值将在线程 B 上处理。
在这种情况下,依赖 Thread Local 就不那么简单了,我们目前正在积极致力于提供更适合反应式世界的替代方案。
您创建一个连接池大小的池的想法很好,但不一定足够,事务通量可能会使用多个线程,因此可能会用事务污染某些线程。
当池用完线程时会发生什么
如果您使用特定的方法Scheduler来隔离像这里这样的阻塞行为,一旦它用完线程,它就会抛出一个RejectedExecutionException.
| 归档时间: |
|
| 查看次数: |
2689 次 |
| 最近记录: |