如果在给定时间内没有发出任何元素,如何使 Flux 发出额外的元素?

lbi*_*ger 6 project-reactor

我正在为 WebFlux SSE 端点实现心跳。为了避免客户端超时,我想确保至少每隔(比如 10 秒)发出一个元素。

我提出了以下解决方案,无论是否已发出真实元素,每 10 秒发出一次心跳元素:

originalFlux.mergeWith(Flux.interval(Duration.ofSeconds(10), Duration.ofSeconds(10)).map(ignored -> "heartbeat")
Run Code Online (Sandbox Code Playgroud)

这对于我的用例来说可能已经足够好了,但我仍然想知道是否只有在过去 10 秒内没有发出任何实际元素的情况下才可以发出心跳。我使用了timeout运算符,它完全实现了我正在寻找的计时行为,但是它会发出错误并取消,originalFlux而不是仅仅发出额外的元素。

以下代码通过timeout了我的测试,但看起来太复杂,据我所知,如果originalFlux在取消和重新订阅之间发出元素,则可能会丢失元素:

ConnectableFlux<String> sharedOriginalFlux = originalFlux.publish();
CompletableFuture<Disposable> eventualSubscription = new CompletableFuture<>();
return addHeartbeat(sharedOriginalFlux)
    .doOnSubscribe(ignored -> eventualSubscription.complete(sharedOriginalFlux.connect()))
    .doFinally(ignored -> eventualSubscription.thenAccept(Disposable::dispose))

private Flux<String> addHeartbeat(Flux<String> sharedOriginalFlux) {
    return sharedOriginalFlux.timeout(
        Duration.ofSeconds(10),
        Flux.mergeSequential(
            Mono.just("heartbeat"),
            Flux.defer(() -> addHeartbeat(sharedOriginalFlux))));
}
Run Code Online (Sandbox Code Playgroud)

有没有一种简单且安全的方法来做到这一点?

Mic*_*rry 2

它不一定更简单,但另一种选择可能是创建一个单独的处理器,可以包装原始处理器Flux以提供心跳(不应错过任何元素):

public class HeartbeatProcessor<T> {

    private final FluxProcessor<T, T> processor;
    private final FluxSink<T> sink;
    private final T heartbeatValue;
    private final Duration heartbeatPeriod;
    private Disposable d;

    public HeartbeatProcessor(Flux<T> orig, T heartbeatValue, Duration heartbeatPeriod) {
        this.heartbeatValue = heartbeatValue;
        this.heartbeatPeriod = heartbeatPeriod;
        this.processor = DirectProcessor.<T>create().serialize();
        this.sink = processor.sink();
        this.d = Mono.just(heartbeatValue).delayElement(heartbeatPeriod).subscribe(this::emit);
        orig.subscribe(this::emit);
    }

    private void emit(T val) {
        sink.next(val);
        d.dispose();
        this.d = Mono.just(heartbeatValue).delayElement(heartbeatPeriod).subscribe(this::emit);
    }

    public Flux<T> getFlux() {
        return processor;
    }
}
Run Code Online (Sandbox Code Playgroud)

然后您可以按如下方式调用它:

new HeartbeatProcessor<>(elements, "heartbeat", Duration.ofSeconds(10))
        .getFlux()
        .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)