如何实现 IObservable

Ber*_*ian 2 c# system.reactive observable .net-core

我希望能够将 aQueue作为 an使用Observable,但我不知道如何创建它。我希望它能够在有人调用Enqueue.

class Producer:IObservable<int>
{
    private object @lock = new object();
    private Queue<int> queue = new Queue<int>();
    List<IObserver<int>> observers = new List<IObserver<int>>();

    public Producer()
    {
    }
    public IObservable<int> ToObservable()
    {
        return ///
    }
    public bool Enqueue(int sample)
    {
        if (sample == null)
        {
            return false;
        }
        this.queue.Enqueue(sample);
        return true;
    }
    public int Dequeue()
    {
        if(!this.queue.TryDequeue(out Sample rez))
        {
            return 0;
        }
        return rez;
    }

    public IDisposable Subscribe(IObserver<int> observer)
    {
    }
}
Run Code Online (Sandbox Code Playgroud)

我可以与处理部分SubscriberIObserver我只是不知道如何包装我的Producer对象在IObservable

Chr*_*eld 5

通常你不会IObservable<T>自己实现来支持Reactive Extensions。将 observables 视为经典事件的替代品,如下所示:

class Producer
{
    private Queue<int> _queue = new Queue<int>();
    private Subject<int> _whenEnqueued = new Subject<int>();

    public IObservable<int> WhenEnqueued => _whenEnqueued.AsObservable();        

    public void Enqueue(int value)
    {
        _queue.Enqueue(value);
        _whenEnqueued.OnNext(value);
    }
}
Run Code Online (Sandbox Code Playgroud)

通过这种方式,外部类可以WhenEnqueued通过将工作委托给类来订阅并获得新值的通知Subject

如果您真的想实施IObservable<T>(通常您不想实施),请查看IObservable 文档中的示例

  • 请在 `public IObservable&lt;int&gt; WhenEnqueued =&gt; _whenEnqueued;` 行上放一个 `.AsObservable()`。现在,您正在传递对主题的引用,任何外部代码都可以将其转换为并结束您的主题。 (3认同)