只有当值已经改变了一定的余量时,Rx IObservable才会产生值

jam*_*nor 3 .net c# system.reactive c#-4.0

我有一种感觉,这可能是一个非常简单的扩展方法,我已经错过但我看不到它...

我基本上想要一个产生流的流,其中值在每个新值上缓慢递增.我希望节流/采样,而不是时间,而是"容忍".例如

var ob = Enumerable.Range(0, 30).ToObservable(); // 0, 1, 2, 3, 4, 5,....., 30
var largeMovingOb = ob.WhenChangedBy(10); // 0, 10, 20, 30
Run Code Online (Sandbox Code Playgroud)

当我有[1,4,20,33]等序列时,我希望在值变化超过最后一个的15时输出 - 这将导致:[1,20].如果价值变化为12将导致:[1,20,33]

这是否有内置的Rx扩展?理想情况下,它可以在所有数字类型上工作,而无需为每个类型编写重载

Mat*_*lay 7

我认为这非常适合 Observable.Scan

var ob = Enumerable.Range(0, 30).ToObservable();
var largeMovingOb = ob.Scan((acc, i) => acc + 10 > i ? acc : i)
  .DistinctUntilChanged();
Run Code Online (Sandbox Code Playgroud)