Jam*_*s L 3 c# system.reactive
我有一个流。我想知道将其转换为当前许多内部流的实时计数,即未完成或出错。
我将如何实施 CountLiveStreams?
var source = new Subject<IObservable<Unit>>();
IObservable<int> count = source.CountLiveStreams();
Run Code Online (Sandbox Code Playgroud)
谢谢!
这是CountLiveStreams运算符的实现:
public static IObservable<int> CountLiveStreams<T>(
this IObservable<IObservable<T>> streamOfStreams)
{
return streamOfStreams
.Select(x => x.IgnoreElements().Select(_ => 0).Catch(Observable.Empty<int>())
.Prepend(1).Append(-1))
.Merge()
.Scan(0, (accumulator, delta) => accumulator + delta)
.Prepend(0);
}
Run Code Online (Sandbox Code Playgroud)
每个发出的流都被转换为IObservable<int>发出 2 个值的 ,开始时值为 1,末尾值为 -1。然后将所有流生成的所有值对合并为一个IObservable<int>,最后使用Scan运算符累加所有这些数字。
| 归档时间: |
|
| 查看次数: |
72 次 |
| 最近记录: |