计算实时并发流的数量

Jam*_*s L 3 c# system.reactive

我有一个流。我想知道将其转换为当前许多内部流的实时计数,即未完成或出错。

我将如何实施 CountLiveStreams?

var source = new Subject<IObservable<Unit>>();
IObservable<int> count = source.CountLiveStreams();
Run Code Online (Sandbox Code Playgroud)

谢谢!

The*_*ias 5

这是CountLiveStreams运算符的实现:

public static IObservable<int> CountLiveStreams<T>(
    this IObservable<IObservable<T>> streamOfStreams)
{
    return streamOfStreams
        .Select(x => x.IgnoreElements().Select(_ => 0).Catch(Observable.Empty<int>())
            .Prepend(1).Append(-1))
        .Merge()
        .Scan(0, (accumulator, delta) => accumulator + delta)
        .Prepend(0);
}
Run Code Online (Sandbox Code Playgroud)

每个发出的流都被转换为IObservable<int>发出 2 个值的 ,开始时值为 1,末尾值为 -1。然后将所有流生成的所有值对合并为一个IObservable<int>,最后使用Scan运算符累加所有这些数字。