Ale*_*exC 12 c# system.reactive
我遇到的一个用例,我怀疑我不能成为唯一一个用例,如下所示:
IObservable<T> Observable.RepeatLastValueDuringSilence(this IObservable<T> inner, TimeSpan maxQuietPeriod);
Run Code Online (Sandbox Code Playgroud)
这将从内部observable返回所有未来的项目,但是,如果内部observable在一段时间内没有调用OnNext(maxQuietPeriod),它只重复最后一个值(当然内部调用OnCompleted或OnError) .
理由是服务定期ping出定期状态更新.例如:
var myStatus = Observable.FromEvent(
h=>this.StatusUpdate+=h,
h=>this.StatusUpdate-=h);
var messageBusStatusPinger = myStatus
.RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1))
.Subscribe(update => _messageBus.Send(update));
Run Code Online (Sandbox Code Playgroud)
这样的事情存在吗?还是我过度估计它的用处?
谢谢,亚历克斯
PS:我为任何不正确的术语/语法道歉,因为我只是第一次探索Rx.
类似于Matthew的解决方案,但是这里的计时器在源中接收到每个元素之后开始,我认为这更正确(但差异不太重要):
public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{
return inner.Select(x =>
Observable.Interval(maxQuietPeriod)
.Select(_ => x)
.StartWith(x)
).Switch();
}
Run Code Online (Sandbox Code Playgroud)
而且测试:
var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1")
.Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2"))
.Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3"));
source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);
Run Code Online (Sandbox Code Playgroud)
您应该看到1打印10次(来自5个来源,5个在沉默期间重复),然后很多,2因为你从源头获得一个,另外4个来自每个之间的沉默,然后是无限的3.
这个相当简单的查询完成了工作:
var query =
source
.Select(s =>
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.StartWith(s)
.Select(x => s))
.Switch();
Run Code Online (Sandbox Code Playgroud)
永远不要低估它的力量.Switch().