相关疑难解决方法(0)

如何使用Reactive Extensions使用最大窗口大小来限制事件?

场景:

我正在构建一个UI应用程序,每隔几毫秒从后端服务获取通知.收到新通知后,我想尽快更新用户界面.

因为我可以在很短的时间内收到大量的通知,而且我总是只关心最新的事件,所以我使用了Reactive Extensions框架的Throttle()方法.这允许我忽略紧跟新通知的通知事件,因此我的UI保持响应.

问题:

假设我将通知事件的事件流限制为50ms,并且后端每隔10ms发送一次通知,Thottle()方法将永远不会返回事件,因为它会一次又一次地重置其滑动窗口.在这里,我需要一些额外的行为来指定类似超时的东西,这样我就可以每秒至少检索一个事件,如果有这么高的事件吞吐量.如何使用Reactive Extensions执行此操作?

c# throttling system.reactive

10
推荐指数
1
解决办法
2786
查看次数

RX - 可观察序列中元素的组/批突发

我有一个可观察的序列.插入第一个元素时,我想在计时器的时间跨度内启动计时器并批量后续插入的元素.然后,计时器不会再次启动,直到序列中插入另一个元素.

所以像这样:

--------|=====timespan====|---------------|=====timespan====|-------------->
        1  2 3 4    5                     6 7            8
Run Code Online (Sandbox Code Playgroud)

会产生:

[1,2,3,4,5], [6,7,8] 
Run Code Online (Sandbox Code Playgroud)

我尝试使用Observable.Buffer()和一个时间跨度但是从我的实验中,我可以看到定时器在我们订阅可观察序列时立即启动,并在上一个定时器完成后立即重新启动.

因此,使用与前一个示例相同的序列并使用带有时间跨度的Buffer(),我会有这样的事情:

|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
        1  2 3 4    5                      6 7           8
Run Code Online (Sandbox Code Playgroud)

哪会产生这个:

[1,2,3,4], [5], [6,7], [8]
Run Code Online (Sandbox Code Playgroud)

以下是我使用Buffer测试此行为的方法:

var source = Observable.Concat(Observable.Timer(TimeSpan.FromSeconds(6)).Select(o => 1),
                               Observable.Timer(TimeSpan.FromSeconds(1)).Select(o => 2),
                               Observable.Timer(TimeSpan.FromSeconds(3)).Select(o => 3),
                               Observable.Never<int>());

Console.WriteLine("{0} => Started", DateTime.Now);
source.Buffer(TimeSpan.FromSeconds(4))
      .Subscribe(i => Console.WriteLine("{0} => [{1}]", DateTime.Now, string.Join(",", i)));
Run Code Online (Sandbox Code Playgroud)

随着输出:

4/24/2015 7:01:09 PM => Started
4/24/2015 7:01:13 PM => []
4/24/2015 7:01:17 PM => [1,2]
4/24/2015 7:01:21 PM => [3]
4/24/2015 …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive observable

5
推荐指数
1
解决办法
852
查看次数

标签 统计

c# ×2

system.reactive ×2

observable ×1

throttling ×1