当没有值传入时,是否有一个Rx方法可以定期重复上一个值?

Ale*_*exC 12 c# system.reactive

我遇到的一个用例,我怀疑我不能成为唯一一个用例,如下所示:

IObservable<T> Observable.RepeatLastValueDuringSilence(this IObservable<T> inner, TimeSpan maxQuietPeriod);
Run Code Online (Sandbox Code Playgroud)

这将从内部observable返回所有未来的项目,但是,如果内部observable在一段时间内没有调用OnNext(maxQuietPeriod),它只重复最后一个值(当然内部调用OnCompleted或OnError) .

理由是服务定期ping出定期状态更新.例如:

var myStatus = Observable.FromEvent(
    h=>this.StatusUpdate+=h,
    h=>this.StatusUpdate-=h);

var messageBusStatusPinger = myStatus
    .RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1))
    .Subscribe(update => _messageBus.Send(update));
Run Code Online (Sandbox Code Playgroud)

这样的事情存在吗?还是我过度估计它的用处?

谢谢,亚历克斯

PS:我为任何不正确的术语/语法道歉,因为我只是第一次探索Rx.

yam*_*men 6

类似于Matthew的解决方案,但是这里的计时器在源中接收到每个元素之后开始,我认为这更正确(但差异不太重要):

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{    
    return inner.Select(x => 
        Observable.Interval(maxQuietPeriod)
                  .Select(_ => x)
                  .StartWith(x)
    ).Switch();
}
Run Code Online (Sandbox Code Playgroud)

而且测试:

var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1")
                       .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2"))
                       .Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3"));

source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);
Run Code Online (Sandbox Code Playgroud)

您应该看到1打印10次(来自5个来源,5个在沉默期间重复),然后很多,2因为你从源头获得一个,另外4个来自每个之间的沉默,然后是无限的3.


Eni*_*ity 6

这个相当简单的查询完成了工作:

var query =
    source
        .Select(s =>
            Observable
                .Interval(TimeSpan.FromSeconds(1.0))
                .StartWith(s)
                .Select(x => s))
        .Switch();
Run Code Online (Sandbox Code Playgroud)

永远不要低估它的力量.Switch().