反应性扩展滑动时间窗口

Ned*_*nov 5 c# system.reactive

我有一系列的股票报价,我想在过去一小时内获取所有数据并对其进行一些处理.我试图用反应式扩展2.0实现这一点.我在另一篇文章中读到使用Interval,但我认为这已被弃用.

Eni*_*ity 9

这种扩展方法会解决您的问题吗?

public static IObservable<T[]> RollingBuffer<T>(
    this IObservable<T> @this,
    TimeSpan buffering)
{
    return Observable.Create<T[]>(o =>
    {
        var list = new LinkedList<Timestamped<T>>();
        return @this.Timestamp().Subscribe(tx =>
        {
            list.AddLast(tx);
            while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
            {
                list.RemoveFirst();
            }
            o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
        }, ex => o.OnError(ex), () => o.OnCompleted());
    });
}
Run Code Online (Sandbox Code Playgroud)