使用IObservable而不是事件

Sim*_*mon 9 c# events system.reactive

我最近一直在阅读有关IObservable的内容.到目前为止,我已经查看了各种SO问题,并观看了他们可以做什么的视频.我正在思考的整个"推动"机制非常棒,但我仍在努力弄清楚究竟是什么.从我的读数来看,我认为某种方式IObservable可以被"观察",并且IObservers是"观察者".

所以现在我要尝试在我的应用程序中实现它.在我开始之前,有一些事情我想坚持下去.我已经看到IObservable与IEnumerable相反,但是,我无法在我的特定实例中看到任何可以合并到我的应用程序中的地方.

目前,我大量使用事件,以至于我可以看到"管道"开始变得无法管理.我想,IObservable可以帮助我.

考虑以下设计,这是我的应用程序中的I/O包装(仅供参考,我通常需要处理字符串):

我有一个基本接口叫IDataIO:

public interface IDataIO
{
  event OnDataReceived;
  event OnTimeout:
  event OnTransmit;
}
Run Code Online (Sandbox Code Playgroud)

现在,我目前有三个实现此接口的类,这些类中的每一个都以某种方式利用异步方法调用,引入了某种类型的多线程处理:

public class SerialIO : IDataIO;
public class UdpIO : IDataIO;
public class TcpIO : IDataIO;
Run Code Online (Sandbox Code Playgroud)

每个类的一个实例都包含在我的最终类中,称为IO(它也实现了IDataIO - 遵循我的策略模式):

public class IO : IDataIO
{
  public SerialIO Serial;
  public UdpIO Udp;
  public TcpIO Tcp;
}
Run Code Online (Sandbox Code Playgroud)

我已经利用策略模式来封装这三个类,这样当IDataIO在运行时在不同实例之间进行更改时,它会对最终用户"隐藏".你可以想象,这导致了背景中的"事件管道".

那么,我如何在我的案例中使用"推送"通知?我想简单地将数据推送给任何感兴趣的人,而不是订阅事件(DataReceived等).我有点不确定从哪里开始.我还在尝试用它的想法/泛型类Subject以及它的各种形式(ReplaySubject/AsynSubject/BehaviourSubject).有人可以请教我这个(也许参考我的设计)?或者这根本不适合IObservable

PS.随意纠正我的任何"误解":)

Ric*_*chK 8

Observable非常适合表示数据流,因此您的DataReceived事件可以很好地模拟可观察的模式,例如IObservable<byte>IObservable<byte[]>.您还可以获得额外的好处,OnError并且OnComplete方便使用.

在实现它方面,很难说出您的具体情况,但我们经常将其Subject<T>用作底层源并调用OnNext推送数据.也许是这样的

// Using a subject is probably the easiest way to push data to an Observable
// It wraps up both IObservable and IObserver so you almost never use IObserver directly
private readonly Subject<byte> subject = new Subject<byte>();

private void OnPort_DataReceived(object sender, EventArgs e)
{
    // This pushes the data to the IObserver, which is probably just a wrapper
    // around your subscribe delegate is you're using the Rx extensions
    this.subject.OnNext(port.Data); // pseudo code 
}
Run Code Online (Sandbox Code Playgroud)

然后,您可以通过属性公开主题:

public IObservable<byte> DataObservable
{
    get { return this.subject; } // Or this.subject.AsObservable();
}
Run Code Online (Sandbox Code Playgroud)

您可以取代你的DataReceived事件IDataIOIObservable<T>,让每个战略类处理他们的数据,他们需要取其方式,推动关闭的Subject<T>.

在另一边,谁订阅了可观察到,然后能够自行处理它像一个事件(只需使用Action<byte[]>),也可以对数据流进行一些真正有用的工作Select,Where,Buffer,等.

private IDataIO dataIo = new ...

private void SubscribeToData()
{ 
    dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes);
}

private void On16Bytes(IList<byte> bytes)
{
    // do stuff
}
Run Code Online (Sandbox Code Playgroud)

ReplaySubject/ ConnectableObservables为伟大的,当你知道你的用户将被迟到的党,但仍然需要追赶上的所有事件.源缓存它所推送的所有内容并为每个订户重放所有内容.只有你可以说这是否是你真正需要的行为(但要小心,因为它会缓存所有会增加你的内存使用量的东西).

当我在学习Rx时,我发现http://licampbell.blogspot.co.uk/关于Rx的博客系列对理解这个理论非常有用(帖子现在有点过时,API已经改变了,所以要注意这个)


Gid*_*rth 5

这绝对是 observables 的理想情况。该IO班将可能看到的最大改善。首先,让我们更改接口以使用 observables,看看组合类变得多么简单。

public interface IDataIO
{
    //you will have to fill in the types here.  Either the event args
    //the events provide now or byte[] or something relevant would be good.
    IObservable<???> DataReceived;
    IObservable<???> Timeout;
    IObservable<???> Transmit;
}

public class IO : IDataIO
{
    public SerialIO Serial;
    public UdpIO Udp;
    public TcpIO Tcp;

    public IObservable<???> DataReceived
    {
        get 
        {
            return Observable.Merge(Serial.DataReceived,
                                    Udp.DataReceived,
                                    Tcp.DataReceived);
        }
    }

    //similarly for other two observables
}
Run Code Online (Sandbox Code Playgroud)

边注:您可能会注意到我更改了接口成员名称。在 .NET 中,事件通常被命名<event name>,引发它们的函数被调用On<event name>

对于生产类,您有一些取决于实际来源的选项。假设您在 中使用 .NET SerialPort 类SerialIODataReceived返回一个IObservable<byte[]>. 由于 SerialPort 已经有一个接收数据的事件,您可以直接使用它来制作您需要的可观察对象。

public class SerialIO : IDataIO
{
    private SerialPort _port;

    public IObservable<byte[]> DataRecived
    {
        get
        {
            return Observable.FromEventPattern<SerialDataReceivedEventHandler,
                                               SerialDataReceivedEventArgs>(
                        h => _port.DataReceived += h,
                        h => _port.DataReceived -= h)
                   .Where(ep => ep.EventArgs.EventType == SerialData.Chars)
                   .Select(ep =>
                           {
                              byte[] buffer = new byte[_port.BytesToRead];
                              _port.Read(buffer, 0, buffer.Length);
                              return buffer;
                           });
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

对于没有现有事件源的情况,您可能需要使用 RichK 建议的主题。他的回答很好地涵盖了这种使用模式,所以我不会在这里重复。

您没有展示如何使用此接口,但根据用例,让这些类上的其他函数返回IObservable本身并完全消除这些“事件”可能更有意义。使用基于事件的异步模式,您必须将事件与您调用的函数分开以触发工作,但是使用 observables,您可以从函数中返回它们,从而使您订阅的内容更加明显。这种方法还允许从每次调用返回的可观察对象发送OnErrorOnCompleted消息来表示操作结束。根据您对组合类的使用,我不希望这在这种特殊情况下有用,但需要牢记这一点。