IObservable - 如何发送/发布/将新值推送到集合

vdh*_*ant 4 .net c# asynchronous system.reactive

我想从我的服务层公开一个IObservable.

为简单起见,我们假设服务层内部从远程服务器(通过套接字)获取消息,并且套接字库需要一个IMessageReponse对象,该对象具有要传递给它的MessageReceived方法.

在内部,服务层创建MessageResponse对象,并在消息到达时通过Action回调通知.

鉴于这种设计,我需要能够将新消息推送到IObservable,但在我看到的任何示例中,Observable.XYZ似乎不支持简单的Send/Publish/Push方法......

如何在此场景中连接我的Observable.XYZ ???

我想要这样的东西......注意我知道这是IObservable的一个非常基本的实现,但我不会想我自己需要编写这个代码......我会想到有些东西会存在于我身边盒子外面.

public class PushObservable<T> : IObservable<T>
{
    private IList<IObserver<T>> _listeners = new List<IObserver<T>>();

    public void Send(T value)
    {
        foreach (var listener in _listeners) 
            listener.OnNext(value); 
    }

    public IDisposable Subscribe(IObserver<T> observer)
    { 
        _listeners.Add(observer);
    }
}
Run Code Online (Sandbox Code Playgroud)

Ana*_*tts 5

你已经重写了一个已经存在的对象!你的"PushObservable"实际上是Subject<T>它,它是Rx中的基本对象之一.

如果你真的想以Rx方式考虑这个问题,你可能IObservable<byte[]>会从套接字开始,然后你会选择这个IObservable<IMessageResponse>,因为在一天结束时,你正在响应的事件是从线上掉下来的字节.