Rx observable 会在特定超时到期时发布一个值

Fla*_*ack 5 c# system.reactive

我有一个返回可观察对象的方法。这个 observable 应该(如果 evetything 正常工作)每秒发布一个值。我想要做的是让它发布一些自定义警报值,如果某个时间过去了而没有输出。

private IObservable<string> GetStatus()
{
    return statusProvider
                .Subscribe(statusKey)  //returns an IObservable<string>
                .Select(st => st.ToUpper())
                .DistinctUntilChanged()
                .TakeUntil(disposed)
                .Replay(1)
                .RefCount();
}
Run Code Online (Sandbox Code Playgroud)

是否有一种简单的方法可以让我修改上述内容,以便如果 30 秒内没有状态更新,则 statusProvider 发布“坏”,然后如果更新确实在此之后进入,它会像往常一样发布并且计时器是再次重启到30秒?

Bra*_*don 2

这是一个方法。启动一个计时器,当它到期时会产生“坏”结果。每次您的 statusProvider 产生状态时,计时器都会重置。

var statusSignal = statusProvider
            .Subscribe(statusKey)  //returns an IObservable<string>
            .Select(st => st.ToUpper())
            .Publish()
            .RefCount();

// An observable that produces "bad" after a delay and then "hangs indefinately" after that
var badTimer = Observable
    .Return("bad")
    .Delay(TimeSpan.FromSeconds(30))
    .Concat(Observable.Never<string>());

// A repeating badTimer that resets the timer whenever a good signal arrives.
// The "indefinite hang" in badTimer prevents this from restarting the timer as soon
// as it produces a "bad".  Which prevents you from getting a string of "bad" messages
// if the statusProvider is silent for many minutes.
var badSignal = badTimer.TakeUntil(statusSignal).Repeat();

// listen to both good and bad signals.
return Observable
    .Merge(statusSignal, badSignal)
    .DistinctUntilChanged()
    .Replay(1)
    .RefCount();
Run Code Online (Sandbox Code Playgroud)