Rx - 可以/我应该用Observable替换.NET事件吗?

Jud*_*ngo 37 c# events system.reactive

鉴于Reactive Extensions(Rx)框架提供的可组合事件的好处,我想知道我的类是否应该停止推送.NET事件,而是暴露Rx可观察量.

例如,使用标准.NET事件获取以下类:

public class Foo
{
   private int progress;
   public event EventHandler ProgressChanged;

   public int Progress
   {
      get { return this.progress; }
      set
      {
         if (this.progress != value)
         {
            this.progress = value;

            // Raise the event while checking for no subscribers and preventing unsubscription race condition.
            var progressChanged = this.ProgressChanged;
            if (progressChanged != null)
            {
                progressChanged(this, new EventArgs());
            }
         }
      }
   }
}
Run Code Online (Sandbox Code Playgroud)

很多单调的管道.

这个类可以使用某种observable来替换这个功能:

public class Foo
{
   public Foo()
   {
       this.Progress = some new observable;
   }

   public IObservable<int> Progress { get; private set; }
}
Run Code Online (Sandbox Code Playgroud)

更少的管道.管道细节不再掩盖意图.这似乎是有益的.

我对你的问题很好StackOverflow人员是:

  1. 用IObservable <T>值替换标准.NET事件是否合适?
  2. 如果我使用一个可观察的,我会在这里使用什么样的?显然我需要向它推送值(例如Progress.UpdateValue(...)或其他东西).

Ana*_*tts 19

对于#2,最直接的方式是通过主题:

Subject<int> _Progress;
IObservable<int> Progress {
    get { return _Progress; }
}

private void setProgress(int new_value) {
    _Progress.OnNext(new_value);
}

private void wereDoneWithWorking() {
    _Progress.OnCompleted();
}

private void somethingBadHappened(Exception ex) {
    _Progress.OnError(ex);
}
Run Code Online (Sandbox Code Playgroud)

有了这个,现在您的"进度"不仅可以通知进度何时发生变化,还可以通知操作何时完成,以及是否成功.但请记住,一旦IObservable通过OnCompleted或OnError完成,它就"死了" - 你不能再发布任何东西了.

  • 我并不是特别担心 - 这就像担心人们会使用Reflection访问私有字段一样,我不认为我的代码客户会积极地让我 (5认同)

Ric*_*lay 7

当有内置主题可以为您执行此操作时,我不建议管理您自己的订户列表.它还消除了携带自己的可变副本T的需要.

以下是我的(无评论)版本的解决方案:

public class Observable<T> : IObservable<T>, INotifyPropertyChanged 
{ 
    private readonly BehaviorSubject<T> values; 

    private PropertyChangedEventHandler propertyChanged; 

    public Observable() : this(default(T))
    {
    } 

    public Observable(T initalValue) 
    { 
        this.values = new BehaviorSubject<T>(initalValue);

        values.DistinctUntilChanged().Subscribe(FirePropertyChanged);
    }

    public T Value 
    { 
        get { return this.values.First(); } 
        set { values.OnNext(value); } 
    }

    private void FirePropertyChanged(T value)
    {
        var handler = this.propertyChanged;

        if (handler != null)
            handler(this, new PropertyChangedEventArgs("Value"));
    }

    public override string ToString() 
    { 
        return value != null ? value.ToString() : "Observable<" + typeof(T).Name + "> with null value."; 
    } 

    public static implicit operator T(Observable<T> input) 
    { 
        return input.Value; 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
        return values.Subscribe(observer);
    } 

    event PropertyChangedEventHandler INotifyPropertyChanged.PropertyChanged 
    { 
        add { this.propertyChanged += value; } 
        remove { this.propertyChanged -= value; } 
    } 
}
Run Code Online (Sandbox Code Playgroud)