dlf*_*dlf 3 c# reactive-programming system.reactive
我的问题有点像Nagle算法创建的问题,但不完全正确.我想要的是将OnNext通知缓冲IObservable<T>到一系列IObservable<IList<T>>s中,如下所示:
T通知到达时,将其添加到缓冲区并开始倒计时T在倒计时到期之前收到另一个通知,请将其添加到缓冲区并重新开始倒计时T通知转发为单个聚合IList<T>通知.IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler) 看起来很有希望,但它似乎定期发送聚合通知,而不是"在第一个通知到达时启动计时器,当其他通知到达时重启它"我想要的行为,并且它还发送一个空列表在如果没有从下面生成通知,则每个时间窗口的结束.
我不希望放弃任何的T通知; 只是缓冲它们.
有这样的事情存在,还是我需要自己编写?
在SO上存在一些类似的问题但不完全像这样.这是一个扩展方法,可以解决这个问题.
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
(this IObservable<TSource> source,
int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
Run Code Online (Sandbox Code Playgroud)
有趣的运营商.Supertopi的答案很好,但可以做出改进.如果maxAmount很大,和/或通知率很高,那么使用Buffer将通过分配不久之后被丢弃的缓冲区来刻录GC.
为了GroupBy在maxAmount到达之后关闭每个Observable ,您不需要捕获Buffer所有这些元素只是为了知道它何时已满.根据Supertopi的回答,您可以将其略微改为以下内容.代替收集的Buffer的maxAmount元素,它已经看到后它只是信号maxAmount流上的元件.
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source, int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Take(maxAmount)
.LastAsync()
.Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
784 次 |
| 最近记录: |