"合并"流的流以产生每个流的最新值

Ale*_*ill 7 c# system.reactive

我有一个IObservable<IObservable<T>>地方,每个内部IObservable<T>是一个价值流​​,然后是最终的OnCompleted事件.

我想将其转换为IObservable<IEnumerable<T>>一个流,该流由来自任何未完成的内部流的最新值组成.IEnumerable<T>每当从一个内部流(或内部流到期)产生新值时,它应该产生一个新的

最容易用大理石图表显示(我希望它足够全面):

input ---.----.---.----------------
         |    |   '-f-----g-|      
         |    'd------e---------|
         'a--b----c-----|          

result ---a--b-b--c-c-c-e-e-e---[]-
               d  d d e f g        
                    f f            
Run Code Online (Sandbox Code Playgroud)

([]是空的IEnumerable<T>-|代表OnCompleted)

您可以看到它略微类似于CombineLatest操作.我一直在玩弄JoinGroupJoin无济于事,但我觉得这是几乎可以肯定的是标题正确的方向.

我想在这个运算符中使用尽可能少的状态.

更新

我已经更新了这个问题,不仅包括单值序列 - 结果IObservable<IEnumerable<T>>应该只包括每个序列的最新值 - 如果序列没有产生值,则不应该包括它.

Bra*_*don 3

这是基于您昨天的解决方案的版本,并根据新要求进行了调整。基本思想是仅将引用放入易腐烂的集合中,然后在内部序列生成新值时更新引用的值。

我还进行了修改,以正确跟踪内部订阅,并在外部可观察对象取消订阅时取消订阅。

还进行了修改,以便在任何流产生错误时将其全部拆除。

最后,我修复了一些可能违反 Rx 准则的竞争条件。如果你的内部可观察对象从不同的线程同时触发,你可能会同时调用,obs.OnNext这是一个很大的禁忌。因此,我使用相同的锁对每个内部可观察对象进行门控以防止这种情况发生(请参阅Synchronize调用)。请注意,正因为如此,您可能可以使用常规双链表而不是 ,PerishableCollection因为现在使用该集合的所有代码都在锁内,因此您不需要PerishableCollection.

// Acts as a reference to the current value stored in the list
private class BoxedValue<T>
{
    public T Value;
    public BoxedValue(T initialValue) { Value = initialValue; }
}

public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Create<IEnumerable<T>>(obs =>
    {
        var collection = new PerishableCollection<BoxedValue<T>>();
        var outerSubscription = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(outerSubscription);
        var innerLock = new object();

        outerSubscription.Disposable = source.Subscribe(duration =>
        {
            BoxedValue<T> value = null;
            var lifetime = new DisposableLifetime(); // essentially a CancellationToken
            var subscription = new SingleAssignmentDisposable();

            subscriptions.Add(subscription);
            subscription.Disposable = duration.Synchronize(innerLock)
                .Subscribe(
                    x =>
                    {
                        if (value == null)
                        {
                            value = new BoxedValue<T>(x);
                            collection.Add(value, lifetime.Lifetime);
                        }
                        else
                        {
                            value.Value = x;
                        }
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                    },
                    obs.OnError, // handle an error in the stream.
                    () => // on complete
                    {
                        if (value != null)
                        {
                            lifetime.Dispose(); // removes the item
                            obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                            subscriptions.Remove(subscription); // remove this subscription
                        }
                    }
            );
        });

        return subscriptions;
    });
}
Run Code Online (Sandbox Code Playgroud)