对Observable进行"速率限制"消耗的最佳方法是什么?

Igo*_*orM 6 system.reactive

我有一堆事件进来,我必须毫无损失地执行所有这些事件,但我想确保它们在适当的时间段被缓冲和消耗.有人有解决方案吗?

我无法在Rx中找到任何可以在没有事件丢失的情况下执行此操作的操作员(Throttle - looses events).我也考虑过缓冲,延迟等......无法找到一个好的解决方案.

我试图在中间放一个计时器,但不知何故它根本不起作用:

GetInitSequence()
            .IntervalThrottle(TimeSpan.FromSeconds(5))
            .Subscribe(
                item =>
                    {
                        Console.WriteLine(DateTime.Now);
                        // Process item
                    }
            );

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
    {
        return Observable.Create<T>(o =>
            {
                return source.Subscribe(x =>
                    {
                        new Timer(state => 
                            o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1));
                    }, o.OnError, o.OnCompleted);
        });
    }
Run Code Online (Sandbox Code Playgroud)

yam*_*men 12

问题不是100%明确,所以我做了一些假设.

Observable.Delay 不是你想要的,因为这将产生从每个事件到来时的延迟,而不是创建甚至处理的时间间隔.

Observable.Buffer 不是你想要的,因为这将导致每个给定间隔中的所有事件传递给你,而不是一次传递给你.

所以我相信你正在寻找一种能够创造某种节拍器的解决方案,并为每个节拍提供一个事件.这可以通过节拍器和连接到您的音源来天真地构建:Observable.IntervalZip

var source = GetInitSequence();
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));    
var triggeredSource = source.Zip(trigger, (s,_) => s); 
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now));
Run Code Online (Sandbox Code Playgroud)

这将每5秒触发一次(在上例中),并按顺序提供原始项目.

这个解决方案的唯一问题是,如果你没有任何更多的源元素(比如说)10秒,当源元素到达时,它们将被立即发送出去,因为一些'触发'事件正在那里等待它们.该场景的大理石图:

source:  -a-b-c----------------------d-e-f-g
trigger: ----o----o----o----o----o----o----o
result:  ----a----b----c-------------d-e-f-g
Run Code Online (Sandbox Code Playgroud)

这是一个非常合理的问题.这里有两个问题可以解决它:

Rx IObservable缓冲以平滑突发事件

一种以均匀间隔推送缓冲事件的方法

提供的解决方案是主要的Drain扩展方法和辅助Buffered扩展.我把它们修改得更简单了(不需要Drain,只需使用Concat).用法是:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5));
Run Code Online (Sandbox Code Playgroud)

扩展方法StepInterval:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}
Run Code Online (Sandbox Code Playgroud)