一种以均匀间隔推送缓冲事件的方法

maj*_*cha 12 .net c# system.reactive

我想要实现的是缓冲来自某些IObservable的传入事件(它们以突发形式进行)并进一步释放它们,但是以偶数间隔逐个释放它们.像这样:

-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->
Run Code Online (Sandbox Code Playgroud)

因为我对Rx很新,所以我不确定是否已经有一个主题或运算符.也许它可以通过作曲来完成?

更新:

感谢 Richard Szalay指出Drain操作符,我找到了James Miles的Drain操作符使用的另一个例子.这是我如何设法让它在WPF应用程序中工作:

    .Drain(x => {
        Process(x);
        return Observable.Return(new Unit())
            .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
    }).Subscribe();
Run Code Online (Sandbox Code Playgroud)

我有一些乐趣,因为省略scheduler参数会导致应用程序在调试模式下崩溃而不会出现任何异常(我需要学习如何处理Rx中的异常).Process方法直接修改了UI状态,但我想从它中生成一个IObservable非常简单(使用ISubject?).

更新:

与此同时,我一直在试验ISubject,下面的课程做了我想要的 - 它及时让出缓冲的Ts:

public class StepSubject<T> : ISubject<T>
{
    IObserver<T> subscriber;
    Queue<T> queue = new Queue<T>();
    MutableDisposable cancel = new MutableDisposable();
    TimeSpan interval;
    IScheduler scheduler;
    bool idle = true;

    public StepSubject(TimeSpan interval, IScheduler scheduler)
    {
        this.interval = interval;
        this.scheduler = scheduler;
    }

    void Step()
    {
        T next;
        lock (queue)
        {
            idle = queue.Count == 0;
            if (!idle)
                next = queue.Dequeue();
        }

        if (!idle)
        {
            cancel.Disposable = scheduler.Schedule(Step, interval);
            subscriber.OnNext(next);
        }
    }

    public void OnNext(T value)
    {
        lock (queue)
            queue.Enqueue(value);

        if (idle)
            cancel.Disposable = scheduler.Schedule(Step);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        subscriber = observer;
        return cancel;
    }
}
Run Code Online (Sandbox Code Playgroud)

为了清楚起见,这个天真的实现从OnCompleted和OnError中剥离,也只允许单个订阅.

Ric*_*lay 11

它实际上比听起来更诡异.

使用Delay不起作用,因为值仍将批量发生,只是略有延迟.

使用Interval任何一个CombineLatestZip不起作用,因为前者将导致跳过源值,后者将缓冲间隔值.

我认为新的Drain运算符(在1.0.2787.0中添加),结合Delay应该可以做到:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));
Run Code Online (Sandbox Code Playgroud)

Drain运营商工作方式类似SelectMany,但等待,直到前面的输出调用与下一个值的选择之前完成.它仍然不正是你所追求的(在一个块中的第一个值也将被推迟),但它很接近:在使用上面现在符合您的大理石图.

编辑:显然Drain在框架中不起作用SelectMany.我会在官方论坛上寻求一些建议.与此同时,这里是Drain的一个实现,可以完成你所追求的目标:

编辑09/11:修复了实施中的错误和更新的使用情况,以匹配您请求的大理石图.

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}
Run Code Online (Sandbox Code Playgroud)