热门康卡特在Rx

J. *_*non 9 c# system.reactive

Observable.Concat是一个连接observable的实现,但第二个IObservable<T>只在第一个完成时才进行订阅.

http://www.introtorx.com/content/v1.0.10621.0/12_CombiningSequences.html#Concat

是否有"HotConcat"的实现?类似于Observable.Merge,但保持交付顺序,首先推动初始订阅的元素,然后推动子后续.就像是: 热门康卡特

我知道可以使用ReplaySubject<T>,但它似乎不太好,因为性能和内存使用影响..

Bra*_*don 9

这是我已经使用了一段时间的实现.此实现引入了一个BufferUntilSubscribed操作员,该操作员会在您调用时将其IObservable转换为IConnectableObservable将启动缓冲Connect,并将缓冲结果传递给第一个订阅者.一旦第一个订户"赶上",则缓冲将停止,并且订户将在他们到达时被给予直播事件.

一旦你有了,你就可以这样写HotConcat:

public static IObservable<T> HotConcat<T>(params IObservable<T>[] sources)
{
    var s2 = sources.Select(s => s.BufferUntilSubscribed());
    var subscriptions = new CompositeDisposable(s2.Select(s2 => s2.Connect()).ToArray());
    return Observable.Create<T>(observer =>
    {
        var s = new SingleAssignmentDisposable();
        var d = new CompositeDisposable(subscriptions);
        d.Add(s);

        s.Disposable = s2.Concat().Subscribe(observer);

        return d;
    });
}
Run Code Online (Sandbox Code Playgroud)

这是以下的实现BufferUntilSubscribed:

private class BufferUntilSubscribedObservable<T> : IConnectableObservable<T>
{
    private readonly IObservable<T> _source;
    private readonly IScheduler _scheduler;
    private readonly Subject<T> _liveEvents;
    private bool _observationsStarted;
    private Queue<T> _buffer;
    private readonly object _gate;

    public BufferUntilSubscribedObservable(IObservable<T> source, IScheduler scheduler)
    {
        _source = source;
        _scheduler = scheduler;
        _liveEvents = new Subject<T>();
        _buffer = new Queue<T>();
        _gate = new object();
        _observationsStarted = false;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (_gate)
        {
            if (_observationsStarted)
            {
                return _liveEvents.Subscribe(observer);
            }

            _observationsStarted = true;

            var bufferedEvents = GetBuffers().Concat().Finally(RemoveBuffer); // Finally clause to remove the buffer if the first observer stops listening.
            return Observable.Merge(_liveEvents, bufferedEvents).Subscribe(observer);
        }
    }

    public IDisposable Connect()
    {
        return _source.Subscribe(OnNext, _liveEvents.OnError, _liveEvents.OnCompleted);
    }

    private void RemoveBuffer()
    {
        lock (_gate)
        {
            _buffer = null;
        }
    }

    /// <summary>
    /// Acquires a lock and checks the buffer.  If it is empty, then replaces it with null and returns null.  Else replaces it with an empty buffer and returns the old buffer.
    /// </summary>
    /// <returns></returns>
    private Queue<T> GetAndReplaceBuffer()
    {
        lock (_gate)
        {
            if (_buffer == null)
            {
                return null;
            }

            if (_buffer.Count == 0)
            {
                _buffer = null;
                return null;
            }

            var result = _buffer;
            _buffer = new Queue<T>();
            return result;
        }
    }

    /// <summary>
    /// An enumerable of buffers that will complete when a call to GetAndReplaceBuffer() returns a null, e.g. when the observer has caught up with the incoming source data.
    /// </summary>
    /// <returns></returns>
    private IEnumerable<IObservable<T>> GetBuffers()
    {
        Queue<T> buffer;
        while ((buffer = GetAndReplaceBuffer()) != null)
        {
            yield return buffer.ToObservable(_scheduler);
        }
    }

    private void OnNext(T item)
    {
        lock (_gate)
        {
            if (_buffer != null)
            {
                _buffer.Enqueue(item);
                return;
            }
        }

        _liveEvents.OnNext(item);
    }
}

/// <summary>
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data.
/// Thus the observer may subscribe late to a hot observable yet still see all of the data.  Later observers will not see the buffered events.
/// </summary>
/// <param name="source"></param>
/// <param name="scheduler">Scheduler to use to dump the buffered data to the observer.</param>
/// <returns></returns>
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source, IScheduler scheduler)
{
    return new BufferUntilSubscribedObservable<T>(source, scheduler);
}

/// <summary>
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data.
/// Thus the observer may subscribe late to a hot observable yet still see all of the data.  Later observers will not see the buffered events.
/// </summary>
/// <param name="source"></param>
/// <returns></returns>
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source)
{
    return new BufferUntilSubscribedObservable<T>(source, Scheduler.Immediate);
}
Run Code Online (Sandbox Code Playgroud)