节流Rx.Observable而不跳过值

Ath*_*ari 15 .net c# system.reactive

Throttle如果其他人跟随太快,则方法会从可观察序列中跳过值.但我需要一种方法来延迟它们.也就是说,我需要在项目之间设置最小延迟,而不是跳过任何项目.

实际例子:有一个Web服务可以接受不超过一秒的请求; 有一个用户可以单独或批量添加请求.没有Rx,我将创建一个列表和一个计时器.当用户添加请求时,我会将它们添加到列表中.在计时器事件中,我将检查列表是否为空.如果不是,我将发送请求并删除相应的项目.随着锁和所有的东西.现在,使用Rx,我可以Subject在用户添加请求时创建,添加项目.但我需要一种方法来确保Web服务不会因应用延迟而泛滥.

我是Rx的新手,所以也许我错过了一些明显的东西.

Eni*_*ity 7

有一个相当简单的方法来做你想要的东西EventLoopScheduler.

我开始使用一个observable,它会每0到3秒随机产生一次值.

var rnd = new Random();

var xs =
    Observable
        .Generate(
            0,
            x => x < 20,
            x => x + 1,
            x => x,
            x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0));
Run Code Online (Sandbox Code Playgroud)

现在,要立即生成此输出值,除非最后一个值在一秒钟之内,我这样做:

var ys =
    Observable.Create<int>(o =>
    {
        var els = new EventLoopScheduler();
        return xs
            .ObserveOn(els)
            .Do(x => els.Schedule(() => Thread.Sleep(1000)))
            .Subscribe(o);
    });
Run Code Online (Sandbox Code Playgroud)

这有效地观察了源上的源EventLoopScheduler,然后在每次之后使其休眠1秒,OnNext以便它只能OnNext在唤醒后才开始下一次.

我测试它使用此代码:

ys
    .Timestamp()
    .Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0)
    .Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)

我希望这有帮助.

  • “Thread.Sleep”不是很糟糕吗?我一直认为为本质上是“计时器”的事情挂起线程是浪费资源。 (2认同)

yam*_*men 5

一个简单的扩展方法怎么样:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}
Run Code Online (Sandbox Code Playgroud)

用法:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1));
Run Code Online (Sandbox Code Playgroud)