Ath*_*ari 15 .net c# system.reactive
Throttle如果其他人跟随太快,则方法会从可观察序列中跳过值.但我需要一种方法来延迟它们.也就是说,我需要在项目之间设置最小延迟,而不是跳过任何项目.
实际例子:有一个Web服务可以接受不超过一秒的请求; 有一个用户可以单独或批量添加请求.没有Rx,我将创建一个列表和一个计时器.当用户添加请求时,我会将它们添加到列表中.在计时器事件中,我将检查列表是否为空.如果不是,我将发送请求并删除相应的项目.随着锁和所有的东西.现在,使用Rx,我可以Subject在用户添加请求时创建,添加项目.但我需要一种方法来确保Web服务不会因应用延迟而泛滥.
我是Rx的新手,所以也许我错过了一些明显的东西.
有一个相当简单的方法来做你想要的东西EventLoopScheduler.
我开始使用一个observable,它会每0到3秒随机产生一次值.
var rnd = new Random();
var xs =
Observable
.Generate(
0,
x => x < 20,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0));
Run Code Online (Sandbox Code Playgroud)
现在,要立即生成此输出值,除非最后一个值在一秒钟之内,我这样做:
var ys =
Observable.Create<int>(o =>
{
var els = new EventLoopScheduler();
return xs
.ObserveOn(els)
.Do(x => els.Schedule(() => Thread.Sleep(1000)))
.Subscribe(o);
});
Run Code Online (Sandbox Code Playgroud)
这有效地观察了源上的源EventLoopScheduler,然后在每次之后使其休眠1秒,OnNext以便它只能OnNext在唤醒后才开始下一次.
我测试它使用此代码:
ys
.Timestamp()
.Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0)
.Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)
我希望这有帮助.
一个简单的扩展方法怎么样:
public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
return source.Select(x =>
Observable.Empty<T>()
.Delay(minDelay)
.StartWith(x)
).Concat();
}
Run Code Online (Sandbox Code Playgroud)
用法:
var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1));
Run Code Online (Sandbox Code Playgroud)