我正在使用反应式扩展将数据整理到100毫秒的缓冲区:
this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100))
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);
Run Code Online (Sandbox Code Playgroud)
这很好用.但是,我想要的行为与Buffer操作提供的行为略有不同.基本上,如果收到另一个数据项,我想重置计时器.只有当整个100毫秒没有收到数据时我才能处理它.这开启了永不处理数据的可能性,因此我还应该能够指定最大计数.我会想象一下:
.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)
Run Code Online (Sandbox Code Playgroud)
我已经环顾四周,在Rx中找不到这样的东西?任何人都可以确认/否认这个吗?
我需要实现一个事件处理,即在没有新事件到达某个时间段时延迟完成.(我必须在文本缓冲区更改时排队解析任务,但我不想在用户仍在键入时启动解析.)
我是RX的新手,但据我所知,我需要BufferWithTime和Timeout方法的组合.我想这是这样的:它会缓冲事件,直到在后续事件之间的指定时间段内定期接收事件.如果事件流中存在间隙(长于时间跨度),则应该返回传播到目前为止缓冲的事件.
看看Buffer和Timeout是如何实现的,我可以实现我的BufferWithTimeout方法(如果每个人都有一个,请与我分享),但我想知道这是否可以通过结合现有方法来实现.有任何想法吗?
我的问题有点像Nagle算法创建的问题,但不完全正确.我想要的是将OnNext通知缓冲IObservable<T>到一系列IObservable<IList<T>>s中,如下所示:
T通知到达时,将其添加到缓冲区并开始倒计时T在倒计时到期之前收到另一个通知,请将其添加到缓冲区并重新开始倒计时T通知转发为单个聚合IList<T>通知.IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler) 看起来很有希望,但它似乎定期发送聚合通知,而不是"在第一个通知到达时启动计时器,当其他通知到达时重启它"我想要的行为,并且它还发送一个空列表在如果没有从下面生成通知,则每个时间窗口的结束.
我不希望放弃任何的T通知; 只是缓冲它们.
有这样的事情存在,还是我需要自己编写?