相关疑难解决方法(0)

反应式扩展是否支持滚动缓冲?

我正在使用反应式扩展将数据整理到100毫秒的缓冲区:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100))
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);
Run Code Online (Sandbox Code Playgroud)

这很好用.但是,我想要的行为与Buffer操作提供的行为略有不同.基本上,如果收到另一个数据项,我想重置计时器.只有当整个100毫秒没有收到数据时我才能处理它.这开启了永不处理数据的可能性,因此我还应该能够指定最大计数.我会想象一下:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)
Run Code Online (Sandbox Code Playgroud)

我已经环顾四周,在Rx中找不到这样的东西?任何人都可以确认/否认这个吗?

.net c# buffer sliding-window system.reactive

23
推荐指数
2
解决办法
4132
查看次数

如何在RX中实现超时缓冲

我需要实现一个事件处理,即在没有新事件到达某个时间段时延迟完成.(我必须在文本缓冲区更改时排队解析任务,但我不想在用户仍在键入时启动解析.)

我是RX的新手,但据我所知,我需要BufferWithTime和Timeout方法的组合.我想这是这样的:它会缓冲事件,直到在后续事件之间的指定时间段内定期接收事件.如果事件流中存在间隙(长于时间跨度),则应该返回传播到目前为止缓冲的事件.

看看Buffer和Timeout是如何实现的,我可以实现我的BufferWithTimeout方法(如果每个人都有一个,请与我分享),但我想知道这是否可以通过结合现有方法来实现.有任何想法吗?

.net system.reactive

4
推荐指数
2
解决办法
2608
查看次数

Reactive的"缓冲直到安静"行为?

我的问题有点像Nagle算法创建的问题,但不完全正确.我想要的是将OnNext通知缓冲IObservable<T>到一系列IObservable<IList<T>>s中,如下所示:

  1. 第一个T通知到达时,将其添加到缓冲区并开始倒计时
  2. 如果T在倒计时到期之前收到另一个通知,请将其添加到缓冲区并重新开始倒计时
  3. 一旦倒计时结束(即生产者已经沉默了一段时间),将所有缓冲的T通知转发为单个聚合IList<T>通知.
  4. 如果缓冲区大小在倒计时到期之前超过某个最大值,则无论如何都要发送它.

IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler) 看起来很有希望,但它似乎定期发送聚合通知,而不是"在第一个通知到达时启动计时器,当其他通知到达时重启它"我想要的行为,并且它还发送一个空列表在如果没有从下面生成通知,则每个时间窗口的结束.

希望放弃任何的T通知; 只是缓冲它们.

有这样的事情存在,还是我需要自己编写?

c# reactive-programming system.reactive

3
推荐指数
2
解决办法
784
查看次数