cip*_*ley 6 java spring reactive-programming spring-boot spring-webflux
我目前正在进行一个构建微服务的项目,并尝试从更传统的 Spring Boot 迁移RestClient到使用 Netty 并WebClient作为 HTTP 客户端的反应式堆栈,以便连接到后端系统。
WebClient这对于使用 REST API 的后端来说进展顺利,但是在实现连接到 SOAP 后端和 Oracle 数据库的服务时仍然遇到一些困难,这些数据库仍然使用传统的 JDBC。
我设法在网上找到一些关于 JDBC 调用的解决方法,这些方法利用并行调度程序来发布阻塞 JDBC 调用的结果:
//the method that is called by @Service
@Override
public Mono<TransactionManagerModel> checkTransaction(String transactionId, String channel, String msisdn) {
return asyncCallable(() -> checkTransactionDB(transactionId, channel, msisdn))
.onErrorResume(error -> Mono.error(error));
}
...
//the actual JDBC call
private TransactionManagerModel checkTransactionDB(String transactionId, String channel, String msisdn) {
...
List<TransactionManagerModel> result =
jdbcTemplate.query(CHECK_TRANSACTION, paramMap, new BeanPropertyRowMapper<>(TransactionManagerModel.class));
...
}
//Generic async callable
private <T> Mono<T> asyncCallable(Callable<T> callable) {
return Mono.fromCallable(callable).subscribeOn(Schedulers.parallel()).publishOn(transactionManagerJdbcScheduler);
}
Run Code Online (Sandbox Code Playgroud)
我认为这非常有效。
而对于 SOAP 调用,我所做的是将 SOAP 调用封装起来,Mono而 SOAP 调用本身使用的CloseableHttpClient显然是一个阻塞的 HTTP 客户端。
//The method that is being 'reactive'
public Mono<OfferRs> addOffer(String transactionId, String channel, String serviceId, OfferRq request) {
...
OfferRs result = adapter.addOffer(transactionId, channel, generateRequest(request));
...
}
//The SOAP adapter that uses blocking HTTP Client
public OfferRs addOffer(String transactionId, String channel, JAXBElement<OfferRq> request) {
...
response = (OfferRs) getWebServiceTemplate().marshalSendAndReceive(url, request, webServiceMessage -> {
try {
SoapHeader soapHeader = ((SoapMessage) webServiceMessage).getSoapHeader();
ObjectFactory headerFactory = new ObjectFactory();
AuthenticationHeader authHeader = headerFactory.createAuthenticationHeader();
authHeader.setUserName(username);
authHeader.setPassWord(password);
JAXBContext headerContext = JAXBContext.newInstance(AuthenticationHeader.class);
Marshaller marshaller = headerContext.createMarshaller();
marshaller.marshal(authHeader, soapHeader.getResult());
} catch (Exception ex) {
log.error("Failed to marshall SOAP Header!", ex);
}
});
return response;
...
}
Run Code Online (Sandbox Code Playgroud)
我的问题是:SOAP 调用的这种实现是否足够“反应性”,以至于我不必担心某些调用在微服务的某些部分被阻止?我已经实现了反应式堆栈 -block()显式调用将抛出异常,因为如果使用 Netty,则不允许这样做。
Schedulers或者我也应该在 SOAP 调用中调整并行的使用吗?
经过一番讨论后我会写一个答案。
Reactor 文档指出您应该将阻塞调用放在它们自己的调度程序上。这基本上是为了保持 Reactor 的非阻塞部分继续运行,如果有东西阻塞,那么 Reactor 将回退到传统的 Servlet 行为,这意味着为每个请求分配一个线程。
Reactor 有非常好的关于调度程序及其类型等的文档。
但简短地说:
当有人订阅时,reactor 将进入称为 the 的状态assembly phase,这意味着它基本上会从订阅点开始向后调用上游操作符,直到找到数据生产者(例如数据库或其他服务等)。如果它onSubscribe在此阶段的某个地方找到 -operator,它将把整个链放在它自己定义的 上Scheduler。所以要知道的一件好事是,放置的位置onSubscribe并不重要,只要在assembly phase整个链条中发现它就会受到影响。
用法示例可以是:
我们有对数据库的阻塞调用、使用阻塞休息客户端的慢速调用、在阻塞庄园中从系统读取文件等。
如果您onPublish在链中的某个位置,assembly phase链将知道它放置在哪里,链将从默认调度程序切换到该特定点的指定调度程序。所以onPublish放置确实很重要。因为它会在放置的位置切换。该运算符更多地控制您想要将某些内容放置在代码中特定点的特定调度程序上。
用法示例可以是:
您正在某个特定点进行一些严重阻塞的 cpu 计算,您可以切换到一个Scheduler.parallell()可以保证所有计算都将放置在单独的核心上进行繁重 cpu 工作的系统,当您完成后,您可以切换回默认调度程序。
Scheduler如果您的肥皂调用被阻塞,则应该将其单独放置,并且我认为onSubscribe使用 a 就足够了,Schedulers.elasticBound()可以获得传统的 servlet 行为。如果您觉得害怕在同一个调度程序上进行每个阻塞调用,您可以在Scheduler函数中传递asyncCallable并拆分调用以使用不同的Schedulers.
| 归档时间: |
|
| 查看次数: |
7756 次 |
| 最近记录: |