Reactive Extensions(Rx) - 当间隔中没有值时,具有最后已知值的样本

Slu*_*art 5 .net c# reactive-programming system.reactive

我有一个可观察的流,以不一致的间隔产生值,如下所示:

------1---2------3----------------4--------------5---
Run Code Online (Sandbox Code Playgroud)

我想对此进行采样,但是一旦产生了一个值,就没有任何空样本:

------1---2------3----------------4--------------5-----

----_----1----2----3----3----3----4----4----4----5----5
Run Code Online (Sandbox Code Playgroud)

我显然认为Replay().RefCount()可以在这里使用提供最后一个已知的值,Sample()但因为它没有重新订阅源流它没有工作.

有关如何做到这一点的任何想法?

Jam*_*rld 6

假设您的源流是,IObservable<int> xs那么您的采样间隔是Timespan duration:

xs.Publish(ps => 
    Observable.Interval(duration)
        .Zip(ps.MostRecent(0), (x,y) => y)
        .SkipUntil(ps))
Run Code Online (Sandbox Code Playgroud)

对于通用的解决方案,更换0参数MostRecentdefault(T)其中IObservable<T>为源的流类型.

目的Publish是防止订阅副作用,因为我们需要订阅源两次 - 一次为MostRecent一次,一次为SkipUntil.后者的目的是在源流的第一个事件之前防止采样值.

如果您不关心在源流的第一个事件之前获取默认值,则可以简化此操作:

Observable.Interval(duration)
    .Zip(xs.MostRecent(0), (x,y) => y)
Run Code Online (Sandbox Code Playgroud)

相关的运营商WithLatestFrom也可能是有意义的; 这是在下一个版本中来到Rx.详情请见此处.