相关疑难解决方法(0)

当缓冲区不包含任何项目时,为什么Rx缓冲区连续执行方法?

我有一个Rx Observable充当缓冲区.现在,当它获得10个项目时,或者在100毫秒之后,以先到者为准,它在订阅中执行该方法.

我注意到我的方法每100毫秒不断被调用,即使缓冲区中没有项目,这让我感到惊讶.如果它没有收到缓冲区中的任何项目,那么只需让我的方法立即返回就足够了,但我觉得奇怪的是它只是在背景中像这样生长.

为什么是这样?你怎么建议我最好处​​理这个?我是Rx的全新手,所以也许我正在做一些奇怪的事情.这是我的代码的简化版本:

private Subject<KeyValuePair<int, Action<MyData>>> serverRequests;

public MyBufferClass(IMyServer server, IScheduler scheduler)
{
    this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>();

    this.serverRequests
        .Buffer(TimeSpan.FromMilliseconds(100), 10, scheduler)
        .Subscribe(buffer => GetMultipleItemsFromServer(buffer));
}   

public void GetSingleItemFromServer(int id, Action<MyData> callback)
{
    this.serverRequests.OnNext(new KeyValuePair<int, Action<MyData>>(id, callback));
}

public void GetMultipleItemsFromServer(IEnumerable<KeyValuePair<int, Action<MyData>>> idsWithCallbacks)
{
    if (idsWithCallbacks.IsNullOrEmpty()) return;

    this.server.GetMultipleItems(idsWithCallbacks)
}
Run Code Online (Sandbox Code Playgroud)

在我的测试中,如果我调用GetSingleItemFromServer 5次然后将我的TestScheduler推进1000毫秒,我认为GetMultipleItemsFromServer只会被调用一次,但它会被调用10次.

.net c# system.reactive

8
推荐指数
2
解决办法
2273
查看次数

RX - 可观察序列中元素的组/批突发

我有一个可观察的序列.插入第一个元素时,我想在计时器的时间跨度内启动计时器并批量后续插入的元素.然后,计时器不会再次启动,直到序列中插入另一个元素.

所以像这样:

--------|=====timespan====|---------------|=====timespan====|-------------->
        1  2 3 4    5                     6 7            8
Run Code Online (Sandbox Code Playgroud)

会产生:

[1,2,3,4,5], [6,7,8] 
Run Code Online (Sandbox Code Playgroud)

我尝试使用Observable.Buffer()和一个时间跨度但是从我的实验中,我可以看到定时器在我们订阅可观察序列时立即启动,并在上一个定时器完成后立即重新启动.

因此,使用与前一个示例相同的序列并使用带有时间跨度的Buffer(),我会有这样的事情:

|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
        1  2 3 4    5                      6 7           8
Run Code Online (Sandbox Code Playgroud)

哪会产生这个:

[1,2,3,4], [5], [6,7], [8]
Run Code Online (Sandbox Code Playgroud)

以下是我使用Buffer测试此行为的方法:

var source = Observable.Concat(Observable.Timer(TimeSpan.FromSeconds(6)).Select(o => 1),
                               Observable.Timer(TimeSpan.FromSeconds(1)).Select(o => 2),
                               Observable.Timer(TimeSpan.FromSeconds(3)).Select(o => 3),
                               Observable.Never<int>());

Console.WriteLine("{0} => Started", DateTime.Now);
source.Buffer(TimeSpan.FromSeconds(4))
      .Subscribe(i => Console.WriteLine("{0} => [{1}]", DateTime.Now, string.Join(",", i)));
Run Code Online (Sandbox Code Playgroud)

随着输出:

4/24/2015 7:01:09 PM => Started
4/24/2015 7:01:13 PM => []
4/24/2015 7:01:17 PM => [1,2]
4/24/2015 7:01:21 PM => [3]
4/24/2015 …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive observable

5
推荐指数
1
解决办法
852
查看次数

标签 统计

c# ×2

system.reactive ×2

.net ×1

observable ×1