使用Rx将多个事件源合并到一个IObservable中

Cel*_*Cel 9 .net c# event-handling system.reactive

这是关于如何在特定事件相关方案中使用Reactive Extensions(Rx)的问题.

  • 目的是采取一些触发某些事件的课程
  • 并将它们聚集成一个IObservable可以被任何客户订阅的(不知道事件类).
  • 注意感兴趣的事件使用子类 EventArgs

一些自定义EventArgs

public class HappenedEventArgs : EventArgs
{
    internal bool IsBadNotGood;
}
Run Code Online (Sandbox Code Playgroud)

发生事件的许多单独类别

public class EventSourceA : IEventSource {

    public event HappenedEventHandler Happened;
    private void OnHappened(HappenedEventArgs e)
    {
        if (Happened != null)
            Happened(this, e);
    }
    // And then this class calls OnHappened(e) whenever it decides to ...
}

public class EventSourceB : IEventSource {

    public event HappenedEventHandler Happened;
    private void OnHappened(HappenedEventArgs e)
    {
        if (Happened != null)
            Happened(this, e);
    }
    // And then this class also calls OnHappened(e) at times ...
}

public interface IEventSource
{
    event HappenedEventHandler Happened;
}

public delegate void HappenedEventHandler(object sender, HappenedEventArgs e);
Run Code Online (Sandbox Code Playgroud)

如何聚集所有这些事件并揭露联合事件阵线

public class Pooler{

    private IObservable<X> _pool;

    public IObservable<X> Subscribe(){
        return _pool;        
    }

    public void Register(IEventSource item)
    {
        // How to take item.Happened and inject/bind it into _pool here?
    }        

    internal void Unregister(IEventSource item)
    {
        // Disconnect item.Happened from _pool
    }

    public Pooler(){
        // Instantiate _pool to whatever is best?
        // _pool = ...
    }

 }
Run Code Online (Sandbox Code Playgroud)

直接了解EventSources的订阅者

 static void Try() {
     var pooler = new Pooler();
     pooler.Subscribe().Subscribe(e =>
            {
                 // Do something with events here, as they arrive
            }
     );
     // ....
     // Wherever whenever:
     AddEventSources(pooler);
 }

 static void AddEventSources(Pooler pooler){
     var eventSourceA = new EventSourceA();
     pooler.Register(eventSourceA);
     var eventSourceB = new EventSourceB();
     pooler.Register(eventSourceB);     
 }
Run Code Online (Sandbox Code Playgroud)

Ast*_*sti 7

Rx库试图提供的是处理诸如此类情况的方法,而无需创建一堆手动传播可观察对象的类/方法.

假设你有一个有活动的课程:

public class EventedClass
{
    public event Action<EventArgs> Event;
}
Run Code Online (Sandbox Code Playgroud)

并且这些实例中有大量的实例IEnumerable<EventedClass> objects,您可以使用LINQ将observable投影到这些类中,将它们组合在一起Observable.Merge可以为您提供这些事件的组合顺序输出.

Observable.Merge(
    objects.Select(
        o => Observable.FromEvent<EventArgs>(
            handler => o.Event += handler,
            handler => o.Event -= handler
        )
)).Subscribe(args => 
{ 
    //do stuff
});
Run Code Online (Sandbox Code Playgroud)