当客户端中止请求时,WebFlux 如何停止发布者?

diz*_*iaq 0 java reactive-programming cancellation project-reactor spring-webflux

SpringBoot v2.5.1

有一个端点请求长时间运行的进程结果,并且它是以某种方式创建的
(为简单起见,它是Mono.fromCallable( ... long running ... ).

客户端发出请求并触发发布者执行工作,但几秒钟后客户端中止请求(即连接丢失)。并且该过程仍然继续利用资源来计算丢弃的结果。

通知 Project Reactor 事件循环有关应取消的不必要的正在进行的工作的机制是什么?

@RestController 
class EndpointSpin {
 
  @GetMapping("/spin")
  Mono<Long> spin() {
    AtomicLong counter = new AtomicLong(0);
    Instant stopTime = Instant.now().plus(Duration.of(1, ChronoUnit.HOURS));

    return Mono.fromCallable(() -> {

      while (Instant.now().isBefore(stopTime)) {
        counter.incrementAndGet();

        if (counter.get() % 10_000_000 == 0) {
          System.out.println(counter.get());
        }

        // of course this does not work
        if (Thread.currentThread().isInterrupted()){
           break;
        }
      }

      return counter.get();
    });
  }
}
Run Code Online (Sandbox Code Playgroud)

Sim*_*slé 5

fromCallable并不能保护您免受 内部计算的阻塞Callable,正如您的示例所示。

反应流中取消的主要方式是cancel()通过Subscription.

即使如此,避免反应式代码内阻塞代码的基本要求仍然成立,因为如果操作符足够简单(即同步),阻塞步骤甚至可以阻止信号的传播cancel()......

在仍然收到取消通知的同时适应非反应式代码的一种方法是Mono.create:它公开 a MonoSink(通过 a Consumer<MonoSink>),可用于将元素推送到下游,同时它有一个onCancel处理程序。

您需要将代码重写为例如。检查AtomicBoolean循环的每次迭代,并在接收器的处理程序中翻转 AtomicBoolean onCancel

Mono.create(sink -> {
    AtomicBoolean isCancelled = new AtomicBoolean();
    sink.onCancel(() -> isCancelled.set(true));
    while (...) {
        ...
        if (isCancelled.get()) break;
    }
});
Run Code Online (Sandbox Code Playgroud)

在您的示例中需要注意的另一件事是:AtomicInteger共享状态。如果您第二次订阅返回的Mono,两个订阅将共享计数器并递增它/并行检查它,这可能不好。

Consumer<MonoSink>在of内部创建这些状态变量Mono.create可确保每个订阅获得其自己的单独状态。