如何处理sse连接关闭?

Erd*_*mir 0 server-sent-events spring-boot project-reactor spring-session

我有一个端点,如示例代码块中所示。流式传输时,我通过 调用异步方法streamHelper.getStreamSuspendCount()。我正在更改状态时停止此异步方法。但当浏览器关闭且会话终止时,我无法访问此异步方法。更改状态时,我将停止会话范围内的异步方法。但当浏览器关闭且会话终止时,我无法访问此异步方法。会话关闭时如何访问此范围?

@RequestMapping(value = "/stream/{columnId}/suspendCount", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Integer> suspendCount(@PathVariable String columnId) {
    ColumnObject columnObject = streamHelper.findColumnObjectInListById(columnId);
    return streamHelper.getStreamSuspendCount(columnObject);
}


getStreamSuspendCount(ColumnObject columnObject) {
   ...
   //async flux
   Flux<?> newFlux = beSubscribeFlow.get(i);
   Disposable disposable = newFlux.subscribe();
   beDisposeFlow.add(disposable); // my session scope variable. if change state, i will kill disposable (dispose()).
   ...
   return Flux.fromStream(Stream.generate(() -> columnObject.getPendingObject().size())).distinctUntilChanged()
                    .doOnNext(i -> {
                        System.out.println(i);
                    }));
}
Run Code Online (Sandbox Code Playgroud)

Sim*_*slé 5

我认为部分问题在于您试图Disposable在会话结束时获取您想要调用的对象。但这样做时,您自己就订阅了该序列。Spring Framework 还将订阅Flux返回的getStreamSuspendCount,并且需要取消该订阅才能让 SSE 客户端收到通知。

现在如何实现这一目标?你需要的是一种“阀门”,它会在收到外部信号时取消其来源。这就是takeUntilOther(Publisher<?>)作用。

因此,现在您需要一个Publisher<?>可以与会话生命周期(更具体地说是会话关闭事件)绑定的对象:一旦发出,takeUntilOther就会取消其源。

那里有 2 个选项:

  • 会话关闭事件在类似侦听器的 API 中公开:使用Mono.create
  • 您确实需要手动触发取消:使用MonoProcessor.create()并在时机到来时通过它推送任何值

以下是带有 API 的简化示例,以供澄清:

创造

return theFluxForSSE.takeUntilOther(Mono.create(sink ->
    sessionEvent.registerListenerForClose(closeEvent -> sink.success(closeEvent))
));
Run Code Online (Sandbox Code Playgroud)

单处理器

MonoProcessor<String> processor = MonoProcessor.create();
beDisposeFlow.add(processor); // make it available to your session scope?
return theFluxForSSE.takeUntilOther(processor); //Spring will subscribe to this
Run Code Online (Sandbox Code Playgroud)

让我们用计划任务模拟会话关闭:

Executors.newSingleThreadScheduledExecutor().schedule(() ->
    processor.onNext("STOP") // that's the key part: manually sending data through the processor to signal takeUntilOther
, 2, TimeUnit.SECONDS);
Run Code Online (Sandbox Code Playgroud)

这是一个模拟单元测试示例,您可以运行它来更好地了解发生的情况:

@Test
public void simulation() {
    Flux<Long> theFluxForSSE = Flux.interval(Duration.ofMillis(100));

    MonoProcessor<String> processor = MonoProcessor.create();
    Executors.newSingleThreadScheduledExecutor().schedule(() -> processor.onNext("STOP"), 2, TimeUnit.SECONDS);

    theFluxForSSE.takeUntilOther(processor.log())
                 .log()
                 .blockLast();
}
Run Code Online (Sandbox Code Playgroud)