为什么我不应该实现IObservable <T>?

TDa*_*ver 12 c# system.reactive

阅读msdn关于Reactive Extensions等等,我发现了一条建议说我不应该实现IObservable,而是使用Observable.Create ......当我读到这篇文章时,我的项目已经有了一个ObservableImplementation<T>类,我就是d用作IObservable源,我想将事件转换为Observables.

我已经阅读了AbstractObservable<T>System.Reactive中的实现,我发现他们的代码和我的代码没有任何重大区别.那么实现IObservable有什么问题?我可以添加自己的属性,依此类推......

为了丰满,这是我的实施,请告诉我,如果我做错了什么!

public sealed class ObservableImplementation<T> : IObservable<T>
{
    class Subscription : IDisposable
    {
        private readonly Action _onDispose;
        public Subscription(Action onDispose)
        {
            _onDispose = onDispose;
        }

        public void Dispose()
        {
            _onDispose();
        }
    }


    public void Raise(T value)
    {
        _observers.ForEach(o => o.OnNext(value));
    }
    public void Completion()
    {
        _observers.ForEach(o => o.OnCompleted());
        _observers.Clear();
    }

    private readonly List<IObserver<T>> _observers = new List<IObserver<T>>();  
    public IDisposable Subscribe(IObserver<T> observer)
    {
        var subscription = new Subscription(() => _observers.Remove(observer));
        _observers.Add(observer);
        return subscription;
    }
    public bool AnyObserverPresent { get { return _observers.Any(); } }
}
Run Code Online (Sandbox Code Playgroud)

小智 22

我们不建议人们直接实现IObservable <T>的原因有几个.

一个是缺乏对违反观察者语法的保护.例如,您的序列可能会在OnCompleted调用后显示OnNext调用的行为,该调用无效.Observable.Create <T>方法和ObservableBase <T>基类型通过在收到终端消息时自动分离观察者来处理这个问题.因此,即使您的代码执行了错误的操作,观察者也看不到格式错误的序列.

顺便说一句,这类似于C#中的迭代器.实现了IEnumerable <T>手工应该是这样的:当一个枚举的MoveNext返回false(类似于OnCompleted),后续调用不改变他们的想法,并开始返回true(类似于OnNext):

如果MoveNext传递集合的末尾,则枚举数位于集合中的最后一个元素之后,MoveNext返回false.当枚举器处于此位置时,后续对MoveNext的调用也会返回false,直到调用Reset.(来源:MSDN)

在C#2.0或VB 11.0中使用迭代器时,会为您解决此类问题.这类似于我们的Observable.Create <T>方法和ObservableBase <T>基类型.

与上述讨论相关的原因是清理.从订阅上的Dispose调用返回后,观察者是否会再看不到任何消息?在向观察者发送终端消息后,是否会自动调用相关订阅的Dispose逻辑?两者都是非常重要的,所以我们的基础实现会解决这个问题.

另一个原因与我们的CurrentThreadScheduler有关,确保在调度程序上运行时,Subscribe调用可以是异步的.基本上,我们需要检查在调用Subscribe期间是否需要在当前线程上安装trampoline.我们不希望每个人都知道这一点并做正确的事.

在你的特殊情况下 - 正如其他人所注意到的那样 - 你正构建了一个很大的主题.要么只是使用我们的一个主题,要么用你自己的类型包含它(例如,如果你希望发送"观察者"一方可以被接收"可观察"方面的其他方访问).


Ana*_*tts 9

你不应该实现IObservable<T>的原因与你通常没有实现的原因相同IEnumerable<T>,是因为有人很可能已经构建了你想要的东西.在这种情况下,您基本上已经重新实现Subject<T>了大部分内容.

编辑:关于评论中的懒惰问题,我会这样实现:

var lazyObservable = Observable.Create<TFoo>(subj => { /*TODO: Implement Me to load from reflection*/ })
    .Multicast(new Subject<TFoo>())   // This means it'll only calc once
    .RefCount();    // This means it won't get created until someone Subscribes
Run Code Online (Sandbox Code Playgroud)


Her*_*man 6

Rx团队最近的博客文章包含三个原因.因为这是一个冗长的帖子,我复制了相关的部分.

执行合同

Observable.Create接受一个委托,该委托将成为生成的IObservable实现上的Subscribe方法的核心实现.我们围绕这个委托进行了一些巧妙的包装,以强制执行观察者合同,以及其他事情(这就是为什么你不应该自己实现接口).

一次性包装纸

返回的一次性在它周围有一个小包装,用于确保在从Dispose调用返回后不再调用观察者,即使调度程序可能还没有处于良好的停止点.(另一个原因是你不应该手工实现IObservable接口.哦,顺便说一下,还有更多!)

完成后自动处理

这里感兴趣的是在发送OnCompleted下游时应用于源订阅的自动处置行为.(这是强烈建议不要手动实施IObservable的另一个原因.当使用Observable.Create时,我们会为您处理此问题.)