Spring Boot Webflux/Netty - 检测关闭连接

Mic*_*ler 6 spring spring-boot project-reactor reactive-streams spring-webflux

我一直在2.0.0.RC1使用webflux starter(spring-boot-starter-webflux)开始使用spring-boot .我创建了一个简单的控制器,返回无限的通量.我希望发布者只有在有客户端(订阅者)时才能正常工作.假设我有一个像这样的控制器:

@RestController
public class Demo {

    @GetMapping(value = "/")
    public Flux<String> getEvents(){
        return Flux.create((FluxSink<String> sink) -> {

            while(!sink.isCancelled()){

                // TODO e.g. fetch data from somewhere

                sink.next("DATA");
            }
            sink.complete();
        }).doFinally(signal -> System.out.println("END"));
    }

}
Run Code Online (Sandbox Code Playgroud)

现在,当我尝试运行该代码并使用Chrome 访问端点http:// localhost:8080 /时,我可以看到数据.但是,一旦我关闭浏览器,while循环就会继续,因为没有触发取消事件.关闭浏览器后,如何终止/取消流媒体?

从这个答案我引用:

目前使用HTTP,确切的背压信息不会通过网络传输,因为HTTP协议不支持此功能.如果我们使用不同的线路协议,这可能会改变.

我认为,由于HTTP协议不支持背压,这意味着也不会发出取消请求.

通过分析网络流量进一步调查,显示浏览器在FIN关闭浏览器后立即发送TCP .有没有办法配置Netty(或其他),以便半封闭连接将触发发布者的取消事件,使while循环停止?

或者我是否必须编写自己的适配器,类似于org.springframework.http.server.reactive.ServletHttpHandlerAdapter我实现自己的订阅服务器的地方?

谢谢你的帮助.

编辑: 一个IOException将在尝试,如果没有客户端将数据写入套接字提高.正如您在堆栈跟踪中看到的那样.

但这还不够好,因为在下一个数据块准备好发送之前可能需要一段时间,因此需要花费相同的时间来检测已经消失的客户端.正如Brian Clozel所说,它是Reactor Netty中的一个已知问题.我试图通过添加依赖项来使用Tomcat POM.xml.像这样:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
Run Code Online (Sandbox Code Playgroud)

虽然它取代了Netty并使用了Tomcat,但由于浏览器没有显示任何数据,它似乎没有被动反应.但是,控制台中没有警告/信息/异常.spring-boot-starter-webflux这个版本(2.0.0.RC1)应该与Tomcat一起工作?

Mic*_*ler 8

由于这是一个已知问题(请参见Brian Clozel的答案),所以我最终使用一个Flux来获取我的真实数据,并使用另一个来实现某种类型的ping /心跳机制。结果,我与合并在一起Flux.merge()

在这里,您可以看到我的解决方案的简化版本:

@RestController
public class Demo {

    public interface Notification{}

    public static class MyData implements Notification{
        …
        public boolean isEmpty(){…}
    }

    @GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() {
        return Flux.merge(getEventMessageStream(), getHeartbeatStream());
    }

    private Flux<ServerSentEvent<Notification>> getHeartbeatStream() {
        return Flux.interval(Duration.ofSeconds(2))
                .map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
                .doFinally(signalType ->System.out.println("END"));
    }

    private Flux<ServerSentEvent<MyData>> getEventMessageStream() {
        return Flux.interval(Duration.ofSeconds(30))
                .map(i -> {

                    // TODO e.g. fetch data from somewhere,
                    // if there is no data return an empty object

                    return data;
                })
                .filter(data -> !data.isEmpty())
                .map(data -> ServerSentEvent
                        .builder(data)
                        .event("message").build());
    }
}
Run Code Online (Sandbox Code Playgroud)

我把所有东西都打包成ServerSentEvent<? extends Notification>Notification只是一个标记界面。我使用类中的event字段ServerSentEvent来区分数据和ping事件。由于心跳会Flux在短时间内连续不断地发送事件,因此检测到客户端消失的时间最多为该时间间隔的长度。请记住,我需要这样做,因为可能要花一些时间才能获得一些可以发送的真实数据,结果,它可能还需要一段时间才能检测到客户端已消失。这样,它将在客户端无法发送ping(或消息事件)后立即检测到客户端已消失。

标记界面上的最后一个注释,我称为“通知”。这并不是真正必要的,但是它提供了一些类型安全性。否则,我们可以为getNotificationStream()方法编写Flux<ServerSentEvent<?>>而不是Flux<ServerSentEvent<? extends Notification>>作为返回类型。或者也可以使getHeartbeatStream()返回Flux<ServerSentEvent<MyData>>。但是,这样可以允许发送任何对象,而这是我所不希望的。结果,我添加了接口。