NoP*_*God 8 c# system.reactive
Observable.TakeWhile允许你运行一个序列,只要条件为真(使用委托,所以我们可以对实际的序列对象执行计算),但是它会在每个元素之前检查这个条件.如何在每个元素之后执行相同的检查?
以下代码演示了此问题
void RunIt()
{
List<SomeCommand> listOfCommands = new List<SomeCommand>();
listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 });
listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 });
listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 });
var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount);
obs.Subscribe(x =>
{
Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount);
});
}
class SomeCommand
{
public int CurrentIndex;
public int TotalCount;
}
Run Code Online (Sandbox Code Playgroud)
这输出
1 of 3
2 of 3
Run Code Online (Sandbox Code Playgroud)
我无法得到第三个元素
看看这个例子,你可能认为我所要做的就是改变我的状况 -
var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex <= c.TotalCount);
Run Code Online (Sandbox Code Playgroud)
但是,observable永远不会完成(因为在我的真实世界代码中,流不会在这三个命令之后结束)
Ric*_*lay 15
没有内置的运算符来执行您所要求的操作,但是这里有一个Publish用于运行两个查询而只订阅底层的observable一次:
// Emits matching values, but includes the value that failed the filter
public static IObservable<T> TakeWhileInclusive<T>(
this IObservable<T> source, Func<T, bool> predicate)
{
return source.Publish(co => co.TakeWhile(predicate)
.Merge(co.SkipWhile(predicate).Take(1)));
}
Run Code Online (Sandbox Code Playgroud)
然后:
var obs = listOfCommands.ToObservable()
.TakeWhileInclusive(c.CurrentIndex != c.TotalCount);
Run Code Online (Sandbox Code Playgroud)
最终编辑:
我在此线程中基于Sergey的TakeWhileInclusive实现实现了解决方案- 如何根据事件中的条件完成Rx Observable
public static IObservable<TSource> TakeUntil<TSource>(
this IObservable<TSource> source, Func<TSource, bool> predicate)
{
return Observable
.Create<TSource>(o => source.Subscribe(x =>
{
o.OnNext(x);
if (predicate(x))
o.OnCompleted();
},
o.OnError,
o.OnCompleted
));
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5043 次 |
| 最近记录: |