什么是IObservable.Subscribe(IObserver <T>观察者)重载?

Jac*_*eja 3 .net system.reactive

当我写.Subscribe表达式时,我经常发现Resharper为我选择了以下重载,位于mscorlib,Version = 4.0.0.0:

namespace System
{
  public interface IObservable<out T>
  {
    IDisposable Subscribe(IObserver<T> observer);
  }
}
Run Code Online (Sandbox Code Playgroud)

这看起来与大多数重载非常不同,这些重载Action都来自mscorlib,而不是System.Reactive.*我期望大多数Reactive的东西.

这次超载有什么作用?该如何使用?怎么IObserver<T>涉及Action?为什么mscorlib会出现这种单一过载?

Shl*_*omo 5

要清楚,这不是一个过载,这是Rx真正的核心.所有其他Subscribe方法,以及所有其他操作符,您已经习惯的是最终调用它的扩展方法.

如果您查看早期文档或Rx,您会看到创建者将其视为LINQ的基于推送的一面.所以很多东西都是你在LINQ中看到的镜像.IObservable是镜子IEnumerable,IObserver是镜子IEnumerator.

但是,因为push是pull的反面,所以Rx版本与它们基于pull的版本相反:

  • IEnumerable定义一个产生一个的方法IEnumerator.IObservable定义了一个方法,该方法取入IObserver.
  • 如果您将IEnumerator.MoveNext()+ IEnumerator.Current视为一个操作,则可以通过以下三种方式之一返回:返回下一个元素,到达收集结束或抛出异常.同样,IObserver必须处理三种情况:next element(OnNext),stream(OnCompleted)end 或exception(OnError).

更熟悉的"重载" Subscribe实际上只是扩展方法,看起来像这样:

public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
{
    return Subscribe(onNext, e => {/*onError */}, () => {/*onCompleted*/);
}

public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
    source.Subscribe(new AnonymousObserver<T>(onNext, onError, onCompleted));
}
Run Code Online (Sandbox Code Playgroud)