RX for .NET - 从 Disposable.Create 中调用observer.OnCompleted 时什么也没有发生

JMc*_*JMc 1 c# system.reactive observable

我正在使用 C# 中的 .NET 的 RX 库。谁能向我解释为什么“observer.OnCompleted()”方法在以下代码中不执行任何操作:

var observableStream = Observable.Create<CustomMessage>(
       (observer) =>
           {
               CustomMessage cm = new CustomMessage();
               CustomMessage.Subscribe(observer.OnNext);

               return Disposable.Create(
               () =>
                   {
                       Console.WriteLine("Disposing...");
                       CustomMessage.Unsubscribe(observer.OnNext);                          
                       observer.OnCompleted();                //***Nothing happens here***
                   }
               );
       });

    //IObserver.OnException()
    public override void OnException(Exception e)
    {
        Console.WriteLine("Exception occurred - " + e.Message);
    }

    //IObserver.OnComplete()
    public override void OnUnsubscribe()
    {
        Console.WriteLine("Unsubscribed...");            
    }

    //IObserver.OnNext()
    public override void HandleNextMsg(IRVMessage msg)
    {
        Console.WriteLine("Instance received a message");
    }

IDisposable myDisposable = observableStream.Subscribe(HandleNextMsg, OnException, OnUnsubscribe);

//At some later point....
myDisposable.Dispose();
Run Code Online (Sandbox Code Playgroud)

该代码旨在订阅 CustomMessages 流。它在设置订阅时使用我的 CustomMessage 类型注册observer.OnNext() 方法。然后,它在处理订阅时取消注册observer.OnNext()。所有这些都可以正常工作。每当收到 CustomMessage 时,就会调用我的“HandleNextMsg()”方法。

稍后,当我希望终止订阅时,我调用“Dispose()”并成功执行以下两行:

Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext); 
Run Code Online (Sandbox Code Playgroud)

然后我就不再收到自定义消息了。然而,下面的行虽然被执行,但什么也不做:

observer.OnCompleted(); 
Run Code Online (Sandbox Code Playgroud)

我预计它会拨打电话:

Console.WriteLine("Unsubscribed...");
Run Code Online (Sandbox Code Playgroud)

在某些时候,观察者和“OnUnsubscribe”方法之间的连接会丢失,我想确切地了解发生了什么。为什么‘observer.OnNext()’可以成功注销,但‘observer.OnCompleted()’什么都不做?

有人向我指出,仅仅因为我正在处理流并不意味着我应该调用“OnCompleted()”,但我仍然想了解为什么它不起作用。

Tim*_*Tim 5

OnCompleted() 旨在通知订阅者上游序列(在您的情况下为 CustomMessage)已结束。这并不意味着确认订阅者请求取消订阅已成功,这似乎就是您尝试使用它的方式。OnCompleted() 是关于该序列的通知,针对所有订阅者,而不是针对该序列的单个订阅。

换句话说,您不应该在 Dispose 中调用它。毕竟,订阅者正在自行取消订阅,为什么需要通知它呢?

至于没有发生任何事情的实际技术原因,我猜测当您已经进行处理时,回调(按设计)不会进行。只是一个理论,意义不大。