我正在为 WebFlux SSE 端点实现心跳。为了避免客户端超时,我想确保至少每隔(比如 10 秒)发出一个元素。
我提出了以下解决方案,无论是否已发出真实元素,每 10 秒发出一次心跳元素:
originalFlux.mergeWith(Flux.interval(Duration.ofSeconds(10), Duration.ofSeconds(10)).map(ignored -> "heartbeat")
Run Code Online (Sandbox Code Playgroud)
这对于我的用例来说可能已经足够好了,但我仍然想知道是否只有在过去 10 秒内没有发出任何实际元素的情况下才可以发出心跳。我使用了timeout运算符,它完全实现了我正在寻找的计时行为,但是它会发出错误并取消,originalFlux而不是仅仅发出额外的元素。
以下代码通过timeout了我的测试,但看起来太复杂,据我所知,如果originalFlux在取消和重新订阅之间发出元素,则可能会丢失元素:
ConnectableFlux<String> sharedOriginalFlux = originalFlux.publish();
CompletableFuture<Disposable> eventualSubscription = new CompletableFuture<>();
return addHeartbeat(sharedOriginalFlux)
.doOnSubscribe(ignored -> eventualSubscription.complete(sharedOriginalFlux.connect()))
.doFinally(ignored -> eventualSubscription.thenAccept(Disposable::dispose))
private Flux<String> addHeartbeat(Flux<String> sharedOriginalFlux) {
return sharedOriginalFlux.timeout(
Duration.ofSeconds(10),
Flux.mergeSequential(
Mono.just("heartbeat"),
Flux.defer(() -> addHeartbeat(sharedOriginalFlux))));
}
Run Code Online (Sandbox Code Playgroud)
有没有一种简单且安全的方法来做到这一点?
它不一定更简单,但另一种选择可能是创建一个单独的处理器,可以包装原始处理器Flux以提供心跳(不应错过任何元素):
public class HeartbeatProcessor<T> {
private final FluxProcessor<T, T> processor;
private final FluxSink<T> sink;
private final T heartbeatValue;
private final Duration heartbeatPeriod;
private Disposable d;
public HeartbeatProcessor(Flux<T> orig, T heartbeatValue, Duration heartbeatPeriod) {
this.heartbeatValue = heartbeatValue;
this.heartbeatPeriod = heartbeatPeriod;
this.processor = DirectProcessor.<T>create().serialize();
this.sink = processor.sink();
this.d = Mono.just(heartbeatValue).delayElement(heartbeatPeriod).subscribe(this::emit);
orig.subscribe(this::emit);
}
private void emit(T val) {
sink.next(val);
d.dispose();
this.d = Mono.just(heartbeatValue).delayElement(heartbeatPeriod).subscribe(this::emit);
}
public Flux<T> getFlux() {
return processor;
}
}
Run Code Online (Sandbox Code Playgroud)
然后您可以按如下方式调用它:
new HeartbeatProcessor<>(elements, "heartbeat", Duration.ofSeconds(10))
.getFlux()
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1193 次 |
| 最近记录: |