Reactive的"缓冲直到安静"行为?

dlf*_*dlf 3 c# reactive-programming system.reactive

我的问题有点像Nagle算法创建的问题,但不完全正确.我想要的是将OnNext通知缓冲IObservable<T>到一系列IObservable<IList<T>>s中,如下所示:

  1. 第一个T通知到达时,将其添加到缓冲区并开始倒计时
  2. 如果T在倒计时到期之前收到另一个通知,请将其添加到缓冲区并重新开始倒计时
  3. 一旦倒计时结束(即生产者已经沉默了一段时间),将所有缓冲的T通知转发为单个聚合IList<T>通知.
  4. 如果缓冲区大小在倒计时到期之前超过某个最大值,则无论如何都要发送它.

IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler) 看起来很有希望,但它似乎定期发送聚合通知,而不是"在第一个通知到达时启动计时器,当其他通知到达时重启它"我想要的行为,并且它还发送一个空列表在如果没有从下面生成通知,则每个时间窗口的结束.

希望放弃任何的T通知; 只是缓冲它们.

有这样的事情存在,还是我需要自己编写?

sup*_*opi 6

在SO上存在一些类似的问题但不完全像这样.这是一个扩展方法,可以解决这个问题.

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
                                          (this IObservable<TSource> source,
                                           int maxAmount, TimeSpan threshold)
{
    return Observable.Create<IList<TSource>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}
Run Code Online (Sandbox Code Playgroud)

  • 当然.`GroupByUntil`对传入的值进行分组,直到作为第二个参数提供的`Observable`提供一个值.在这种情况下,我们将所有通知分组到同一组,并等待来自"Throttle"或"Buffer"的值(因此是"Merge").前者确保沉默阈值,后者确保最大限度. (2认同)

Nia*_*ton 6

有趣的运营商.Supertopi的答案很好,但可以做出改进.如果maxAmount很大,和/或通知率很高,那么使用Buffer将通过分配不久之后被丢弃的缓冲区来刻录GC.

为了GroupBymaxAmount到达之后关闭每个Observable ,您不需要捕获Buffer所有这些元素只是为了知道它何时已满.根据Supertopi的回答,您可以将其略微改为以下内容.代替收集的BuffermaxAmount元素,它已经看到后它只是信号maxAmount流上的元件.

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source, int maxAmount, TimeSpan threshold)
{
    return Observable.Create<IList<TSource>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge(g.Take(maxAmount)
                                                 .LastAsync()
                                                 .Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}
Run Code Online (Sandbox Code Playgroud)