将两个Observable合并为一个具有更高优先级的Observable

Chr*_*ney 10 c# system.reactive

是否可以使用ReactiveExtensions来实现以下目标;

  • 两个Observable,一个是"高"优先,另一个是"低"

  • 将两个Observable合并为一个,然后可以订阅,意图产生的Observable将始终在任何低优先级项之前发出高优先级项.

据我所知,使用两个ConcurrentQueue集合可以更简单地实现这一点;

return this.highPriorityItems.TryDequeue(out item) 
    || this.lowPriorityItems.TryDequeue(out item);
Run Code Online (Sandbox Code Playgroud)

但是这种方法存在一些问题,比如不像Observable那样"可订阅"(所以一旦队列耗尽,处理就会结束而没有太多额外的guff将其推送到Task中).

此外,我有兴趣在队列上应用一些额外的过滤,比如限制和"直到变化",所以Rx看起来很自然.

Jam*_*rld 7

您所描述的当然是优先级队列。

Rx 是关于事件,而不是队列。当然,队列在 Rx被大量使用——但它们不是一流的概念,更多的是 Rx 概念的实现细节。

我们需要队列的一个很好的例子是处理慢观察者。事件在 Rx 中按顺序分派,如果事件到达的速度比观察者可以处理的速度快,那么它们必须排队等待观察者。如果有很多观察者,那么必须维护多个逻辑队列,因为观察者可能以不同的速度前进——而 Rx 选择不让它们保持同步。

“背压”是观察者向 observable 提供反馈的概念,以允许机制处理更快的 observable 的压力——例如合并或节流。Rx 没有引入背压的一流方式 - 可观察对象监控观察者的唯一内置方式是通过OnNext. 任何其他机制都需要带外。您的问题直接与背压有关,因为它仅在观察者缓慢的情况下才相关。

我提到这一切是为了证明我的主张,即 Rx 不是提供您正在寻找的那种优先调度的好选择——实际上,一流的排队机制似乎更合适。

要解决手头的问题,您需要在自定义运算符中自己管理优先级排队。重申这个问题:您要说的是,如果事件在观察者处理事件期间到达OnNext,从而需要分派事件,那么您想要分派而不是 Rx 使用的典型 FIFO 队列基于某些优先级。

需要注意的是,本着 Rx 不会让多个观察者保持锁步的精神,并发观察者可能会以不同的顺序看到事件,这对您来说可能是也可能不是问题。您可以使用诸如Publish获得订单一致性之类的机制- 但您可能不想这样做,因为在这种情况下事件交付的时间会变得非常不可预测且效率低下。

我确信有更好的方法可以做到这一点,但这里是基于优先级队列的交付的一个示例 - 您可以使用更好的队列实现将其扩展为适用于多个流和优先级(甚至每个事件的优先级)(例如基于 b 树的优先级队列)但我选择保持这个相当简单。即便如此,请注意代码必须解决的大量问题,围绕错误处理、完成等 - 我已经选择了何时发出信号,表明肯定有很多其他有效的选择。

总而言之,这个实现肯定让放弃了使用 Rx 的想法。它足够复杂,无论如何这里可能存在错误。正如我所说,可能有更简洁的代码(特别是考虑到我付出的最小努力!),但从概念上讲,无论实现如何,我都对这个想法感到不舒服:

public static class ObservableExtensions
{
    public static IObservable<TSource> MergeWithLowPriorityStream<TSource>(
        this IObservable<TSource> source,
        IObservable<TSource> lowPriority,
        IScheduler scheduler = null)
    {    
        scheduler = scheduler ?? Scheduler.Default;
        return Observable.Create<TSource>(o => {    
            // BufferBlock from TPL dataflow is used as it is
            // handily awaitable. package: Microsoft.Tpl.Dataflow        
            var loQueue = new BufferBlock<TSource>();
            var hiQueue = new BufferBlock<TSource>();
            var errorQueue = new BufferBlock<Exception>();
            var done = new TaskCompletionSource<int>();
            int doneCount = 0;
            Action incDone = () => {
                var dc = Interlocked.Increment(ref doneCount);
                if(dc == 2)
                    done.SetResult(0);
            };
            source.Subscribe(
                x => hiQueue.Post(x),
                e => errorQueue.Post(e),
                incDone);
            lowPriority.Subscribe(
                x => loQueue.Post(x),
                e => errorQueue.Post(e),
                incDone);
            return scheduler.ScheduleAsync(async(ctrl, ct) => {
                while(!ct.IsCancellationRequested)
                {
                    TSource nextItem;
                    if(hiQueue.TryReceive(out nextItem)
                      || loQueue.TryReceive(out nextItem))
                        o.OnNext(nextItem);

                    else if(done.Task.IsCompleted)
                    {
                        o.OnCompleted();
                        return;
                    }

                    Exception error;                        
                    if(errorQueue.TryReceive(out error))
                    {
                        o.OnError(error);
                        return;
                    }

                    var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct);    
                    var loAvailableAsync = loQueue.OutputAvailableAsync(ct);                    
                    var errAvailableAsync =
                        errorQueue.OutputAvailableAsync(ct);
                    await Task.WhenAny(
                        hiAvailableAsync,
                        loAvailableAsync,
                        errAvailableAsync,
                        done.Task);
                }
            });
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

和示例用法:

void static Main()
{
    var xs = Observable.Range(0, 3);
    var ys = Observable.Range(10, 3);

    var source = ys.MergeWithLowPriorityStream(xs);

    source.Subscribe(Console.WriteLine, () => Console.WriteLine("Done"));
}
Run Code Online (Sandbox Code Playgroud)

这将打印出ysfirst的元素,表明它们的优先级更高。

  • 谢谢 James,你的解释让我对 Rx 的实际目的和能力有了更多的了解。当然,你是对的,Rx 解决方案正在尝试将方钉安装在圆孔中。我突然觉得有必要再从头到尾读一遍 IntroToRx!再次感谢您花时间提供此答案,非常感谢。 (2认同)