Sam*_*Sam 24 java reactive-programming backpressure spring-webflux
我是Spring Web-Flux的首发.我写了一个控制器如下:
@RestController
public class FirstController
{
@GetMapping("/first")
public Mono<String> getAllTweets()
{
return Mono.just("I am First Mono")
}
}
Run Code Online (Sandbox Code Playgroud)
我知道其中一个反应性好处是Backpressure,它可以平衡请求或响应率.我想知道如何在Spring Web-Flux中使用背压机制.
Ole*_*uka 57
为了理解Backpressure在当前WebFlux框架实现中的工作原理,我们必须回顾一下默认使用的传输层.我们可能还记得,浏览器和服务器之间的正常通信(服务器到服务器通信通常也是一样)是通过TCP连接完成的.WebFlux还使用该传输进行客户端和服务器之间的通信.然后,为了获得背压控制项的含义,我们必须从Reactive Streams规范的角度来回顾一下背压的含义.
基本语义定义了如何通过背压来调节流元素的传输.
因此,从该声明中,我们可以得出结论,在Reactive Streams中,背压是一种通过传输(通知)接收者可以消耗多少元素来调节需求的机制; 在这里,我们有一个棘手的问题.TCP具有字节抽象而不是逻辑元素抽象.我们通常所说的背压控制是控制向/从网络发送/接收的逻辑元件的数量.即使TCP有自己的流控制(参见这里的含义和那里的动画),这个流控制仍然是字节而不是逻辑元素.
在WebFlux模块的当前实现中,背压由传输流控制来调节,但它不会暴露接收方的实际需求.为了最终看到交互流程,请参见下图:
为简单起见,上图显示了两个微服务之间的通信,其中左侧发送数据流,右侧消耗该流.以下编号列表提供了该图表的简要说明:
正如我们从上图中可以看到的那样,接收者公开的需求与发送者的需求不同(这里需要逻辑元素).这意味着两者的需求是孤立的,仅适用于WebFlux < - >业务逻辑(服务)交互,并且较少暴露服务A < - >服务B交互的背压.
所有这些意味着背压控制在WebFlux中并不像我们预期的那样公平.
如果我们仍然希望对WebFlux中的背压进行不公平的控制,我们可以在Project Reactor运营商的支持下这样做limitRate().以下示例显示了我们如何使用该运算符:
@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
return tweetService.process(tweetsFlux.limitRate(10))
.then();
}
Run Code Online (Sandbox Code Playgroud)
正如我们从示例中看到的那样,limitRate()运算符允许一次定义要预取的元素数.这意味着即使最终订户请求Long.MAX_VALUE元素,limitRate运营商也会将该需求分成块,并且不允许一次消耗更多.我们可以使用元素发送过程:
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
return tweetService.retreiveAll()
.limitRate(10);
}
Run Code Online (Sandbox Code Playgroud)
上面的示例表明,即使WebFlux一次请求超过10个元素,也会limitRate()限制对预取大小的需求,并防止一次消耗超过指定数量的元素.
另一种选择是实现自己的Subscriber或扩展BaseSubscriber来自Project Reactor.例如,以下是我们如何做到这一点的简单例子:
class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {
int consumed;
final int limit = 5;
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(limit);
}
@Override
protected void hookOnNext(T value) {
// do business logic there
consumed++;
if (consumed == limit) {
consumed = 0;
request(limit);
}
}
}
Run Code Online (Sandbox Code Playgroud)
为了通过网络边界实现逻辑元素背压,我们需要一个适当的协议.幸运的是,有一种称为RScoket协议.RSocket是一种应用程序级协议,允许通过网络边界传输实际需求.该协议有一个RSocket-Java实现,允许设置RSocket服务器.在服务器到服务器通信的情况下,相同的RSocket-Java库也提供客户端实现.要了解有关如何使用RSocket-Java的更多信息,请参阅此处的以下示例.对于浏览器 - 服务器通信,有一个RSocket-JS实现,它允许通过WebSocket连接浏览器和服务器之间的流通信.
现在有一些框架,建立在RSocket协议之上.
其中一个框架是Proteus项目,它提供了构建在RSocket之上的完整的微服务.此外,Proteus与Spring框架很好地集成,所以现在我们可以实现公平的背压控制(参见那里的例子)
| 归档时间: |
|
| 查看次数: |
6054 次 |
| 最近记录: |