Łuk*_*icz 5 spring spring-boot project-reactor spring-webflux
我正在尝试使用 Spring WebFlux 构建一个简单的聊天服务器。这很简单,而且工作起来也很顺利。我现在想要实现的是服务器端 Flux 流的终止。想象一下有一个无限的 Flux 暴露如下:
@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user)
Run Code Online (Sandbox Code Playgroud)
我有 10 个客户端/订阅者连接到该事件流。现在我想终止一个特定客户端的连接,因为例如用户在聊天中咒骂。任何。是否可以管理/识别此类端点的订阅者?
您可以利用运算符构建一些东西.takeUntilOther(Publisher),并Publisher在用户应该断开连接时发出给定的信号......
@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user) {
return Flux.from(/* your logic to build your flux */)
.takeUntilOther(disconnector.onDisconnectUser(user));
}
Run Code Online (Sandbox Code Playgroud)
一种可能的实现onDisconnectUser(String user)是过滤Flux<String>发出用户名以按给定用户名断开连接的“全局”热点。也许是这样的:
public class UserDisconnecter {
private final FluxProcessor<String, String> processor;
private final FluxSink<String> sink;
public UserDisconnecter() {
this.processor = DirectProcessor.create();
this.sink = this.processor.sink();
}
/**
* Signals that all existing streams for this user should be disconnected.
*/
public void disconnectUser(String user) {
this.sink.next(user);
}
/**
* Returns a Mono that emits when the given user should be disconnected.
*/
public Mono<String> onDisconnectUser(String user) {
return processor
.filter(user::equals)
.next();
}
}
Run Code Online (Sandbox Code Playgroud)
这是一个简单的实现,但应该可以帮助您入门。
| 归档时间: |
|
| 查看次数: |
1553 次 |
| 最近记录: |