如何在RX中实现超时缓冲

Gas*_*agy 4 .net system.reactive

我需要实现一个事件处理,即在没有新事件到达某个时间段时延迟完成.(我必须在文本缓冲区更改时排队解析任务,但我不想在用户仍在键入时启动解析.)

我是RX的新手,但据我所知,我需要BufferWithTime和Timeout方法的组合.我想这是这样的:它会缓冲事件,直到在后续事件之间的指定时间段内定期接收事件.如果事件流中存在间隙(长于时间跨度),则应该返回传播到目前为止缓冲的事件.

看看Buffer和Timeout是如何实现的,我可以实现我的BufferWithTimeout方法(如果每个人都有一个,请与我分享),但我想知道这是否可以通过结合现有方法来实现.有任何想法吗?

cwh*_*ris 13

这是一个相当古老的问题,但我确实认为以下答案值得一提,因为所有其他解决方案都迫使用户手动订阅,跟踪更改等.

我提供以下作为"Rx-y"解决方案.

var buffers = source
    .GroupByUntil(
        // yes. yes. all items belong to the same group.
        x => true,
        g => Observable.Amb<int>(
               // close the group after 5 seconds of inactivity
               g.Throttle(TimeSpan.FromSeconds(5)),
               // close the group after 10 items
               g.Skip(9)
             ))
    // Turn those groups into buffers
    .SelectMany(x => x.ToArray());
Run Code Online (Sandbox Code Playgroud)

基本上,源是窗口化的,直到根据最新窗口定义的一些可观察的.创建一个新窗口(分组可观察),我们使用该窗口确定窗口何时关闭.在这种情况下,我在5秒不活动或最大长度为10(9 + 1)后关闭窗口.


Ric*_*lay 3

我想BufferWithTime这就是你所追求的。

没有内置任何内容,但类似这样的东西应该可以工作:

注意:如果源发生错误,则不会刷新缓冲区。这与当前(或我上次检查时的当前)功能相匹配BufferWith*

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout)
{
    return source.BufferWithTimeout(timeout, Scheduler.TaskPool);
}

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler)
{
    return Observable.CreateWithDisposable<TSource[]>(observer =>
    {
        object lockObject = new object();
        List<TSource> buffer = new List<TSource>();

        MutableDisposable timeoutDisposable = new MutableDisposable();

        Action flushBuffer = () =>
        {
            TSource[] values;

            lock(lockObject)
            {
                values = buffer.ToArray();
                buffer.Clear();
            }

            observer.OnNext(values);
        };

        var sourceSubscription = source.Subscribe(
            value =>
            {
                lock(lockObject)
                {
                    buffer.Add(value);
                }

                timeoutDisposable.Disposable = 
                    scheduler.Schedule(flushBuffer, timeout);
            },
            observer.OnError,
            () =>
            {
                flushBuffer();
                observer.OnCompleted();
            });

        return new CompositeDisposable(sourceSubscription, timeoutDisposable);
    });
}
Run Code Online (Sandbox Code Playgroud)