如何从队列中创建 IObservable,以便在队列为空时序列不会结束?

Tim*_*ong 4 c# reactive-programming system.reactive

我正在研究使用 Reactive Extensions for .NET (Rx) 的东西,我想要一个从队列(或类似的)中获取输入的序列。

我试过这样做:

    static readonly Queue<DeviceTransaction> TransactionQueue = new Queue<DeviceTransaction>();
    //...
    var observableTransactionSource = TransactionQueue.ToObservable();
    //...
    observableTransactionSource.Subscribe(transactionObserver);
Run Code Online (Sandbox Code Playgroud)

它在一定程度上起作用,但是当队列为空时序列完成。我不想要一个空队列来结束序列。空并不意味着结束,它只是意味着“此刻不再”。

有没有办法在队列为空时停止序列完成,或者我应该以不同的方式思考整个问题?

Jam*_*rld 5

调用ToObservable()充满了问题,正如我在这里解释的那样它只会使用IEnumerable<T>并消耗队列的快照。

在这种情况下,您最好使用 aSubject<T>来支持您的事件。由于 Rx 语法指定您必须序列化事件的传递,因此它已经具有排队语义。只需调用OnNext<T>该主题即可发布事件。

如果您需要确保在发布事件后发生的订阅不会错过事件,请使用ReplaySubject<T>.

如果使用的主题与您有关,那么您可能需要查看此博客文章。总而言之,您对队列的使用表明在这里使用主题是可以的,但您可能需要考虑是否可以使用像Observable.FromEvent.