Reactive Extensions:是否可以订阅Buffer操作结果的Sum?

Ale*_*sky 3 c# system.reactive

我试图获得关于Aggregate函数(即Sum)结果的通知,它对无限序列的部分序列进行操作(最顶层,数据源序列永远不会完成).问题可以在这里看到:

var seq = Observable.Interval(TimeSpan.FromMilliseconds(20)).Buffer(10);
seq.Sum(l => l.Sum())
            .Subscribe(n =>
                s_log.DebugFormat("Got {0}", n));
Run Code Online (Sandbox Code Playgroud)

Lambda l.Sum()按预期调用(计算部分和),但从不打印"Got ..."行,因为永远不会调用subscriber.我怀疑它与某种原始序列的"永无止境"特征有关.有限序列:

 Observable.Range(1,100).Buffer(10);
Run Code Online (Sandbox Code Playgroud)

按预期工作.所以问题很简单:如何将无限序列的部分片段"标记"为"完整",因此聚合函数将分别对它们起作用(并将其结果推送给订阅者)?

Shl*_*omo 5

扫描是你的朋友:

seq.Scan(0L, (l1, l2) => l1 + l2.Sum())
   .Subscribe(n => Console.WriteLine("Got {0}", n));
Run Code Online (Sandbox Code Playgroud)