如何组合两个按时间戳分组的流?

Dmi*_*ruk 6 c# complex-event-processing system.reactive

我有两个对象流,每个对象都有一个Timestamp值.两个流都是有序的,因此例如时间戳可以是一个流中的T a = 1,3,6,6,7而另一个流中的T b = 1,2,5,5,6,8.两个流中的对象属于同一类型.

我希望能够做的是按时间戳顺序将每个事件放在总线上,即放置A 1,然后放置B 1,B 2,A 3等等.此外,由于某些流具有多个具有相同时间戳的(顺序)元素,因此我希望将这些元素分组,以便每个新事件都是一个数组.所以我们将[A 3 ]放在总线上,然后是[A 1 5,A 2 5 ],依此类推.

我试图通过制作两个ConcurrentQueue结构来实现这一点,将每个事件放在队列的后面,然后查看队列的每个前面,首先选择先前的事件,然后遍历队列,以便存在具有此时间戳的所有事件.

但是,我遇到了两个问题:

  • 如果我让这些队列无限制,我会快速耗尽内存,因为读取操作比接收事件的处理程序快得多.(我有几千兆字节的数据).
  • 我有时会遇到一个情况,我会在A 2 5到达之前处理事件,例如A 1 5.我不知何故需要防范这一点.

我认为Rx可以在这方面提供帮助,但我没有看到明显的组合使这成为可能.因此,非常感谢任何建议.

Tys*_*son 10

Rx确实非常适合IMO的这个问题.

IObservables 由于显而易见的原因,你不能'OrderBy'(你必须首先观察整个流以保证正确的输出顺序),所以下面我的答案假设(你说过)你的2个源事件流是有序的.

这最终是一个有趣的问题.标准的Rx运算符缺少一个GroupByUntilChanged可以轻松解决这个问题的运算符,只要它OnComplete在观察到下一组的第一个元素时调用前一个组是可观察的.然而,查看DistinctUntilChanged它的实现并不遵循这种模式,只有OnComplete在源observable完成时调用(即使它知道在第一个非不同元素之后将没有更多的元素......很奇怪???).无论如何,出于这些原因,我决定反对一种GroupByUntilChanged方法(不破坏Rx约定)而是去了一个ToEnumerableUntilChanged.

免责声明:这是我的第一个Rx扩展,所以我希望得到有关我的选择的反馈.此外,我的一个主要问题是持有该distinctElements名单的匿名观察者.

首先,您的应用程序代码非常简单:

    public class Event
    {
        public DateTime Timestamp { get; set; }
    }

    private IObservable<Event> eventStream1;
    private IObservable<Event> eventStream2; 

    public IObservable<IEnumerable<Event>> CombineAndGroup()
    {
        return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
            .ToEnumerableUntilChanged(e => e.Timestamp);
    }
Run Code Online (Sandbox Code Playgroud)

现在为ToEnumerableUntilChanged实现(代码墙警告):

    public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector)
    {
        // TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
        var comparer = EqualityComparer<TKey>.Default;

        return Observable.Create<IEnumerable<TSource>>(observer =>
        {
            var currentKey = default(TKey);
            var hasCurrentKey = false;
            var distinctElements = new List<TSource>();

            return source.Subscribe((value =>
            {
                TKey elementKey;
                try
                {
                    elementKey = keySelector(value);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }

                if (!hasCurrentKey)
                {
                    hasCurrentKey = true;
                    currentKey = elementKey;
                    distinctElements.Add(value);
                    return;
                }

                bool keysMatch;
                try
                {
                    keysMatch = comparer.Equals(currentKey, elementKey);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    return;
                }

                if (keysMatch)
                {
                    distinctElements.Add(value);
                    return;
                }

                observer.OnNext( distinctElements);

                distinctElements.Clear();
                distinctElements.Add(value);
                currentKey = elementKey;

            }), observer.OnError, () =>
            {
                if (distinctElements.Count > 0)
                    observer.OnNext(distinctElements);

                observer.OnCompleted();
            });
        });
    }
Run Code Online (Sandbox Code Playgroud)