我有一个Rx Observable充当缓冲区.现在,当它获得10个项目时,或者在100毫秒之后,以先到者为准,它在订阅中执行该方法.
我注意到我的方法每100毫秒不断被调用,即使缓冲区中没有项目,这让我感到惊讶.如果它没有收到缓冲区中的任何项目,那么只需让我的方法立即返回就足够了,但我觉得奇怪的是它只是在背景中像这样生长.
为什么是这样?你怎么建议我最好处理这个?我是Rx的全新手,所以也许我正在做一些奇怪的事情.这是我的代码的简化版本:
private Subject<KeyValuePair<int, Action<MyData>>> serverRequests;
public MyBufferClass(IMyServer server, IScheduler scheduler)
{
this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>();
this.serverRequests
.Buffer(TimeSpan.FromMilliseconds(100), 10, scheduler)
.Subscribe(buffer => GetMultipleItemsFromServer(buffer));
}
public void GetSingleItemFromServer(int id, Action<MyData> callback)
{
this.serverRequests.OnNext(new KeyValuePair<int, Action<MyData>>(id, callback));
}
public void GetMultipleItemsFromServer(IEnumerable<KeyValuePair<int, Action<MyData>>> idsWithCallbacks)
{
if (idsWithCallbacks.IsNullOrEmpty()) return;
this.server.GetMultipleItems(idsWithCallbacks)
}
Run Code Online (Sandbox Code Playgroud)
在我的测试中,如果我调用GetSingleItemFromServer 5次然后将我的TestScheduler推进1000毫秒,我认为GetMultipleItemsFromServer只会被调用一次,但它会被调用10次.
我有一个可观察的序列.插入第一个元素时,我想在计时器的时间跨度内启动计时器并批量后续插入的元素.然后,计时器不会再次启动,直到序列中插入另一个元素.
所以像这样:
--------|=====timespan====|---------------|=====timespan====|-------------->
1 2 3 4 5 6 7 8
Run Code Online (Sandbox Code Playgroud)
会产生:
[1,2,3,4,5], [6,7,8]
Run Code Online (Sandbox Code Playgroud)
我尝试使用Observable.Buffer()和一个时间跨度但是从我的实验中,我可以看到定时器在我们订阅可观察序列时立即启动,并在上一个定时器完成后立即重新启动.
因此,使用与前一个示例相同的序列并使用带有时间跨度的Buffer(),我会有这样的事情:
|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
1 2 3 4 5 6 7 8
Run Code Online (Sandbox Code Playgroud)
哪会产生这个:
[1,2,3,4], [5], [6,7], [8]
Run Code Online (Sandbox Code Playgroud)
以下是我使用Buffer测试此行为的方法:
var source = Observable.Concat(Observable.Timer(TimeSpan.FromSeconds(6)).Select(o => 1),
Observable.Timer(TimeSpan.FromSeconds(1)).Select(o => 2),
Observable.Timer(TimeSpan.FromSeconds(3)).Select(o => 3),
Observable.Never<int>());
Console.WriteLine("{0} => Started", DateTime.Now);
source.Buffer(TimeSpan.FromSeconds(4))
.Subscribe(i => Console.WriteLine("{0} => [{1}]", DateTime.Now, string.Join(",", i)));
Run Code Online (Sandbox Code Playgroud)
随着输出:
4/24/2015 7:01:09 PM => Started
4/24/2015 7:01:13 PM => []
4/24/2015 7:01:17 PM => [1,2]
4/24/2015 7:01:21 PM => [3]
4/24/2015 …Run Code Online (Sandbox Code Playgroud)