反应式扩展是否支持滚动缓冲?

Ken*_*art 23 .net c# buffer sliding-window system.reactive

我正在使用反应式扩展将数据整理到100毫秒的缓冲区:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100))
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);
Run Code Online (Sandbox Code Playgroud)

这很好用.但是,我想要的行为与Buffer操作提供的行为略有不同.基本上,如果收到另一个数据项,我想重置计时器.只有当整个100毫秒没有收到数据时我才能处理它.这开启了永不处理数据的可能性,因此我还应该能够指定最大计数.我会想象一下:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)
Run Code Online (Sandbox Code Playgroud)

我已经环顾四周,在Rx中找不到这样的东西?任何人都可以确认/否认这个吗?

Col*_*nic 16

这可以通过结合内置WindowThrottle方法来实现Observable.首先,让我们解决忽略最大计数条件的简单问题:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}
Run Code Online (Sandbox Code Playgroud)

强大的Window方法完成了繁重的工作.现在很容易看到如何添加最大数量:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
    var closes = stream.Throttle(delay);
    if (max != null)
    {
        var overflows = stream.Where((x,index) => index+1>=max);
        closes = closes.Merge(overflows);
    }
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}
Run Code Online (Sandbox Code Playgroud)

我会在我的博客上写一篇解释这个的帖子.https://gist.github.com/2244036

Window方法的文档:

  • 使用上述 BufferUntilInactive 场景 - 如果订阅者比生产者慢,您可能会看到下一组窗口项将被缓冲并且不会被推送到订阅者的场景,除非生成了一个项目...... (2认同)

Eni*_*ity 14

我写了一个扩展来完成你所经历的大部分工作 - BufferWithInactivity.

这里是:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source,
    TimeSpan inactivity,
    int maximumBufferSize)
{
    return Observable.Create<IEnumerable<T>>(o =>
    {
        var gate = new object();
        var buffer = new List<T>();
        var mutable = new SerialDisposable();
        var subscription = (IDisposable)null;
        var scheduler = Scheduler.ThreadPool;

        Action dump = () =>
        {
            var bts = buffer.ToArray();
            buffer = new List<T>();
            if (o != null)
            {
                o.OnNext(bts);
            }
        };

        Action dispose = () =>
        {
            if (subscription != null)
            {
                subscription.Dispose();
            }
            mutable.Dispose();
        };

        Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
            onAction =>
            {
                lock (gate)
                {
                    dispose();
                    dump();
                    if (o != null)
                    {
                        onAction(o);
                    }
                }
            };

        Action<Exception> onError = ex =>
            onErrorOrCompleted(x => x.OnError(ex));

        Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());

        Action<T> onNext = t =>
        {
            lock (gate)
            {
                buffer.Add(t);
                if (buffer.Count == maximumBufferSize)
                {
                    dump();
                    mutable.Disposable = Disposable.Empty;
                }
                else
                {
                    mutable.Disposable = scheduler.Schedule(inactivity, () =>
                    {
                        lock (gate)
                        {
                            dump();
                        }
                    });
                }
            }
        };

        subscription =
            source
                .ObserveOn(scheduler)
                .Subscribe(onNext, onError, onCompleted);

        return () =>
        {
            lock (gate)
            {
                o = null;
                dispose();
            }
        };
    });
}
Run Code Online (Sandbox Code Playgroud)