“无止境” TakeWhile,BufferWhile和SkipWhile RX.Net序列

Jör*_*ann 1 .net reactive-programming system.reactive

我想知道是否存在一种方法来获取可观察的流并使用* While运算符,尤其是TakeWhile,SkipWhile和BufferWhile,以便在满足bool“ while”条件时,它们的订阅者不会收到.OnComplete?

当我开始使用.TakeWhile / SkipWhile和BufferWhile运算符时,我假定它们不会终止/ .OnComplete(),而只是在满足布尔条件时才发出/不发出。

举个例子可能更有意义:

我有一个bool标志,指示一个实例是否忙,以及一个可观察的数据流:

private bool IsBusy { get;set; }
private bool IgnoreChanges { get;set; }

private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
Run Code Online (Sandbox Code Playgroud)

..并像这样(简化)使用/设置RX流

private void SetupRx()
{
    ConsumerSubscription = Producer
        .SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false
        .BufferWhile(_ => IsBusy == true) // for all streamed instances buffer them as long as we are busy handling the previous one(s)
        .Subscribe(i => DoSomething(i));
}

private void DoSomething(int i)
{
    try
    {
        IsBusy = true;
        // ... do something
    }
    finally
    {
        IsBusy = false;
    }
}
Run Code Online (Sandbox Code Playgroud)

每当IsBusy / IgnoreChanges标志从true切换为false并返回,但保持流保持活动状态时,.SkipeWhile / .BufferWhile不应完成/ OnComplete(..)。

开箱即用RX.Net可以做到这一点吗?和/或有人知道如何做到这一点吗?

Tim*_*lds 5

要从源中删除OnCompleted消息IObservable<T>,只需Concat使用Observable.Never<T>()

source.TakeWhile(condition).Concat(Observable.Never<T>())
Run Code Online (Sandbox Code Playgroud)

要手动订阅IObservable<T>源,以便仅在手动取消订阅时才终止订阅,可以使用PublishIConnectableObservable<T>

var connectableSource = source.Publish();
// To subscribe to the source:
var subscription = connectableSource.Connect();
...
// To unsubscribe from the source:
subscription.Dispose();
Run Code Online (Sandbox Code Playgroud)

话虽这么说,我认为您的做法不正确。如果正确完成,则不需要上述技巧。查看您的查询:

ConsumerSubscription = Producer
    // Drop the producer's stream of ints whenever the IgnoreChanges flag
    // is set to true, but forward them whenever the IgnoreChanges flag is set to false
    .SkipWhile(_ => IgnoreChanges == true) 
    // For all streamed instances buffer them as long as we are busy
    // handling the previous one(s)
    .BufferWhile(_ => IsBusy == true) 
    .Subscribe(i => DoSomething(i));
Run Code Online (Sandbox Code Playgroud)

您应该使用.Where(_ => !IgnoreChanges)而不是.SkipWhile(_ => IgnoreChanges)

你应该使用.Buffer(_ => IsBusy.SkipWhile(busy => busy))一个BehaviorSubject<bool> IsBusy替代.BufferWhile(_ => IsBusy)

完整的代码如下所示:

private BehaviorSubject<bool> IsBusy { get;set; }
private bool IgnoreChanges { get;set; }

private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }

private void SetupRx()
{
    ConsumerSubscription = Producer
        .Where(_ => !IgnoreChanges)
        .Buffer(_ => IsBusy.SkipWhile(busy => busy))
        .Subscribe(buffer => DoSomething(buffer));
}

private void DoSomething(IList<int> buffer)
{
    try
    {
        IsBusy.OnNext(true);
        // Do something
    }
    finally
    {
        IsBusy.OnNext(false);
    }
}
Run Code Online (Sandbox Code Playgroud)

下一个改进是尝试摆脱这种情况BehaviorSubject<bool> IsBusy。您希望避免使用主题,因为主题是您必须管理的状态。