diz*_*iaq 0 java reactive-programming cancellation project-reactor spring-webflux
SpringBoot v2.5.1
有一个端点请求长时间运行的进程结果,并且它是以某种方式创建的
(为简单起见,它是Mono.fromCallable( ... long running ... ).
客户端发出请求并触发发布者执行工作,但几秒钟后客户端中止请求(即连接丢失)。并且该过程仍然继续利用资源来计算丢弃的结果。
通知 Project Reactor 事件循环有关应取消的不必要的正在进行的工作的机制是什么?
@RestController
class EndpointSpin {
@GetMapping("/spin")
Mono<Long> spin() {
AtomicLong counter = new AtomicLong(0);
Instant stopTime = Instant.now().plus(Duration.of(1, ChronoUnit.HOURS));
return Mono.fromCallable(() -> {
while (Instant.now().isBefore(stopTime)) {
counter.incrementAndGet();
if (counter.get() % 10_000_000 == 0) {
System.out.println(counter.get());
}
// of course this does not work
if (Thread.currentThread().isInterrupted()){
break;
}
}
return counter.get();
});
}
}
Run Code Online (Sandbox Code Playgroud)
fromCallable并不能保护您免受 内部计算的阻塞Callable,正如您的示例所示。
反应流中取消的主要方式是cancel()通过Subscription.
即使如此,避免反应式代码内阻塞代码的基本要求仍然成立,因为如果操作符足够简单(即同步),阻塞步骤甚至可以阻止信号的传播cancel()......
在仍然收到取消通知的同时适应非反应式代码的一种方法是Mono.create:它公开 a MonoSink(通过 a Consumer<MonoSink>),可用于将元素推送到下游,同时它有一个onCancel处理程序。
您需要将代码重写为例如。检查AtomicBoolean循环的每次迭代,并在接收器的处理程序中翻转 AtomicBoolean onCancel:
Mono.create(sink -> {
AtomicBoolean isCancelled = new AtomicBoolean();
sink.onCancel(() -> isCancelled.set(true));
while (...) {
...
if (isCancelled.get()) break;
}
});
Run Code Online (Sandbox Code Playgroud)
在您的示例中需要注意的另一件事是:AtomicInteger共享状态。如果您第二次订阅返回的Mono,两个订阅将共享计数器并递增它/并行检查它,这可能不好。
Consumer<MonoSink>在of内部创建这些状态变量Mono.create可确保每个订阅获得其自己的单独状态。