如何使用Reactive Extensions实现事件

Med*_*o42 7 c# events system.reactive

Reactive Extensions允许您轻松订阅使用的活动Observable.FromEventPattern,但是当您拥有活动时,我找不到任何有关如何实施活动的信息IObservable.

我的情况是这样的:我需要实现一个包含事件的接口.每当我的对象的某个值发生变化时,就会调用该事件,出于线程安全的原因,我需要在某个事件上调用此事件SynchronizationContext.我也应该在注册时使用当前值调用每个事件处理程序.

public interface IFooWatcher
{
    event FooChangedHandler FooChanged;
}
Run Code Online (Sandbox Code Playgroud)

使用Rx获得一个可以完成我想要的观察是相当容易的BehaviorSubject:

public class FooWatcher
{
    private readonly BehaviorSubject<Foo> m_subject;
    private readonly IObservable<Foo> m_observable;

    public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
    {
        m_subject = new BehaviorSubject<Foo>(initialValue);
        m_observable = m_subject
            .DistinctUntilChanged()
            .ObserveOn(synchronizationContext);
    }

    public event FooChangedHandler FooChanged
    {
        add { /* ??? */ }
        remove { /* ??? */ }
    }
}
Run Code Online (Sandbox Code Playgroud)

现在我正在寻找一种简单的方法有addremove功能的订阅和取消订阅传递的FooChangedHandler作为Observer<Foo>m_observable.我目前的实现看起来类似于:

    add
    {
        lock (m_lock)
        {
            IDisposable disp = m_observable.Subscribe(value);
            m_registeredObservers.Add(
                new KeyValuePair<FooChangedHandler, IDisposable>(
                    value, disp));
        }
    }

    remove
    {
        lock (m_lock)
        {
            KeyValuePair<FooChangedHandler, IDisposable> observerDisposable =
                m_registeredObservers
                    .First(pair => object.Equals(pair.Key, value));
            m_registeredObservers.Remove(observerDisposable);
            observerDisposable.Value.Dispose();
        }
    }
Run Code Online (Sandbox Code Playgroud)

但是,我希望找到一个更简单的解决方案,因为我需要实现其中几个事件(不同的处理程序类型).我试图推出自己的通用解决方案,但它会产生一些需要解决的其他问题(特别是,你如何通常使用带有参数的委托T),所以我宁愿找到一个现有的解决方案,在这方面的差距 - 正如FromEventPattern相反.

Ric*_*ein 2

你可以这样做:

public event FooChangedHandler FooChanged
{
    add { m_observable.ToEvent().OnNext += value; }
    remove { m_observable.ToEvent().OnNext -= value; }
}
Run Code Online (Sandbox Code Playgroud)

但是,在删除时,我认为您可能只是想处理订阅...或者从 ToEvent() 获取操作并将其存储为成员。未经测试。

编辑:但是,您必须使用 Action 而不是 FooChangedHandler 委托。

编辑2:这是一个经过测试的版本。但是,我想您需要使用 FooChangedHandler,因为您有一堆这些预先存在的处理程序?

void Main()
{
    IObservable<Foo> foos = new [] { new Foo { X = 1 }, new Foo { X = 2 } }.ToObservable();
    var watcher = new FooWatcher(SynchronizationContext.Current, new Foo { X = 12 });
    watcher.FooChanged += o => o.X.Dump();  
    foos.Subscribe(watcher.Subject.OnNext); 
}

// Define other methods and classes here

//public delegate void FooChangedHandler(Foo foo);
public interface IFooWatcher
{
    event Action<Foo> FooChanged;
}

public class Foo {
    public int X { get; set; }
}
public class FooWatcher
{

    private readonly BehaviorSubject<Foo> m_subject;
    public BehaviorSubject<Foo> Subject { get { return m_subject; } }
    private readonly IObservable<Foo> m_observable;

    public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
    {
        m_subject = new BehaviorSubject<Foo>(initialValue);

        m_observable = m_subject
            .DistinctUntilChanged();
    }

    public event Action<Foo> FooChanged
    {
        add { m_observable.ToEvent().OnNext += value; }
        remove { m_observable.ToEvent().OnNext -= value; }
    }
}
Run Code Online (Sandbox Code Playgroud)