Jör*_*ann 1 .net reactive-programming system.reactive
我想知道是否存在一种方法来获取可观察的流并使用* While运算符,尤其是TakeWhile,SkipWhile和BufferWhile,以便在满足bool“ while”条件时,它们的订阅者不会收到.OnComplete?
当我开始使用.TakeWhile / SkipWhile和BufferWhile运算符时,我假定它们不会终止/ .OnComplete(),而只是在满足布尔条件时才发出/不发出。
举个例子可能更有意义:
我有一个bool标志,指示一个实例是否忙,以及一个可观察的数据流:
private bool IsBusy { get;set; }
private bool IgnoreChanges { get;set; }
private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
Run Code Online (Sandbox Code Playgroud)
..并像这样(简化)使用/设置RX流
private void SetupRx()
{
ConsumerSubscription = Producer
.SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false
.BufferWhile(_ => IsBusy == true) // for all streamed instances buffer them as long as we are busy handling the previous one(s)
.Subscribe(i => DoSomething(i));
}
private void DoSomething(int i)
{
try
{
IsBusy = true;
// ... do something
}
finally
{
IsBusy = false;
}
}
Run Code Online (Sandbox Code Playgroud)
每当IsBusy / IgnoreChanges标志从true切换为false并返回,但保持流保持活动状态时,.SkipeWhile / .BufferWhile不应完成/ OnComplete(..)。
开箱即用RX.Net可以做到这一点吗?和/或有人知道如何做到这一点吗?
要从源中删除OnCompleted消息IObservable<T>,只需Concat使用Observable.Never<T>():
source.TakeWhile(condition).Concat(Observable.Never<T>())
Run Code Online (Sandbox Code Playgroud)
要手动订阅IObservable<T>源,以便仅在手动取消订阅时才终止订阅,可以使用Publish和IConnectableObservable<T>:
var connectableSource = source.Publish();
// To subscribe to the source:
var subscription = connectableSource.Connect();
...
// To unsubscribe from the source:
subscription.Dispose();
Run Code Online (Sandbox Code Playgroud)
话虽这么说,我认为您的做法不正确。如果正确完成,则不需要上述技巧。查看您的查询:
ConsumerSubscription = Producer
// Drop the producer's stream of ints whenever the IgnoreChanges flag
// is set to true, but forward them whenever the IgnoreChanges flag is set to false
.SkipWhile(_ => IgnoreChanges == true)
// For all streamed instances buffer them as long as we are busy
// handling the previous one(s)
.BufferWhile(_ => IsBusy == true)
.Subscribe(i => DoSomething(i));
Run Code Online (Sandbox Code Playgroud)
您应该使用.Where(_ => !IgnoreChanges)而不是.SkipWhile(_ => IgnoreChanges)。
你应该使用.Buffer(_ => IsBusy.SkipWhile(busy => busy))一个BehaviorSubject<bool> IsBusy替代.BufferWhile(_ => IsBusy)。
完整的代码如下所示:
private BehaviorSubject<bool> IsBusy { get;set; }
private bool IgnoreChanges { get;set; }
private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
private void SetupRx()
{
ConsumerSubscription = Producer
.Where(_ => !IgnoreChanges)
.Buffer(_ => IsBusy.SkipWhile(busy => busy))
.Subscribe(buffer => DoSomething(buffer));
}
private void DoSomething(IList<int> buffer)
{
try
{
IsBusy.OnNext(true);
// Do something
}
finally
{
IsBusy.OnNext(false);
}
}
Run Code Online (Sandbox Code Playgroud)
下一个改进是尝试摆脱这种情况BehaviorSubject<bool> IsBusy。您希望避免使用主题,因为主题是您必须管理的状态。