终止服务器上的特定 Flux 流

Łuk*_*icz 5 spring spring-boot project-reactor spring-webflux

我正在尝试使用 Spring WebFlux 构建一个简单的聊天服务器。这很简单,而且工作起来也很顺利。我现在想要实现的是服务器端 Flux 流的终止。想象一下有一个无限的 Flux 暴露如下:

@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user)
Run Code Online (Sandbox Code Playgroud)

我有 10 个客户端/订阅者连接到该事件流。现在我想终止一个特定客户端的连接,因为例如用户在聊天中咒骂。任何。是否可以管理/识别此类端点的订阅者?

Phi*_*lay 6

您可以利用运算符构建一些东西.takeUntilOther(Publisher),并Publisher在用户应该断开连接时发出给定的信号......

@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user) {
    return Flux.from(/* your logic to build your flux */)
               .takeUntilOther(disconnector.onDisconnectUser(user));
}
Run Code Online (Sandbox Code Playgroud)

一种可能的实现onDisconnectUser(String user)是过滤Flux<String>发出用户名以按给定用户名断开连接的“全局”热点。也许是这样的:

public class UserDisconnecter {

    private final FluxProcessor<String, String> processor;
    private final FluxSink<String> sink;

    public UserDisconnecter() {
        this.processor = DirectProcessor.create();
        this.sink = this.processor.sink();
    }

    /**
     * Signals that all existing streams for this user should be disconnected.
     */
    public void disconnectUser(String user) {
        this.sink.next(user);
    }

    /**
     * Returns a Mono that emits when the given user should be disconnected.
     */
    public Mono<String> onDisconnectUser(String user) {
        return processor
            .filter(user::equals)
            .next();
    }
}
Run Code Online (Sandbox Code Playgroud)

这是一个简单的实现,但应该可以帮助您入门。