为什么在所有初始订户断开连接后RefCount无法正常工作?

Ken*_*art 7 .net c# system.reactive

考虑以下:

[Fact]
public void foo()
{
    var result = new Subject<bool>();
    var startCount = 0;
    var completionCount = 0;
    var obs = Observable
        .Defer(() =>
            {
                ++startCount;
                return result.FirstAsync();
            })
        .Do(_ => ++completionCount)
        .Publish()
        .RefCount();

    // pretend there are lots of subscribers at once
    var s1 = obs.Subscribe();
    var s2 = obs.Subscribe();
    var s3 = obs.Subscribe();

    // even so, we only expect to be started once
    Assert.Equal(1, startCount);
    Assert.Equal(0, completionCount);

    // and we won't complete until the result ticks through
    result.OnNext(true);
    Assert.Equal(1, startCount);
    Assert.Equal(1, completionCount);

    s1.Dispose();
    s2.Dispose();
    s3.Dispose();

    // now try exactly the same thing again
    s1 = obs.Subscribe();
    s2 = obs.Subscribe();
    s3 = obs.Subscribe();

    // startCount is 4 here instead of the expected 2!
    Assert.Equal(2, startCount);
    Assert.Equal(1, completionCount);

    result.OnNext(true);
    Assert.Equal(2, startCount);
    Assert.Equal(2, completionCount);

    s1.Dispose();
    s2.Dispose();
    s3.Dispose();
}
Run Code Online (Sandbox Code Playgroud)

我对Publish+的理解RefCount是,只要存在至少一个订户,就保持与源的连接.一旦最后一个用户断开连接,任何未来的用户将重新启动与源的连接.

正如你在我的测试中看到的那样,一切都在第一次完美运行.但第二次,管道内的延迟可观察对每个新用户执行一次.

我可以通过调试器看到,对于第一组订户,obs._count(对订户计数)会增加每次调用Subscribe.但对于第二组订户,它仍为零.

为什么会发生这种情况,我该怎么做才能纠正我的管道?

use*_*190 1

这是因为底层的可观察结果已经完成。因此每个新订阅者只会收到 OnCompleted 回调。

如果 ObservableDefer 每次都创建一个新序列或一个未完成的序列,您将看到所需的行为。

例如

return result.FirstAsync().Concat(Observable.Never<bool>());
Run Code Online (Sandbox Code Playgroud)

您需要删除Assert.Equal(1, completionCount);