Dmi*_*ruk 6 c# complex-event-processing system.reactive
我有两个对象流,每个对象都有一个Timestamp
值.两个流都是有序的,因此例如时间戳可以是一个流中的T a = 1,3,6,6,7
而另一个流中的T b = 1,2,5,5,6,8
.两个流中的对象属于同一类型.
我希望能够做的是按时间戳顺序将每个事件放在总线上,即放置A 1,然后放置B 1,B 2,A 3等等.此外,由于某些流具有多个具有相同时间戳的(顺序)元素,因此我希望将这些元素分组,以便每个新事件都是一个数组.所以我们将[A 3 ]放在总线上,然后是[A 1 5,A 2 5 ],依此类推.
我试图通过制作两个ConcurrentQueue
结构来实现这一点,将每个事件放在队列的后面,然后查看队列的每个前面,首先选择先前的事件,然后遍历队列,以便存在具有此时间戳的所有事件.
但是,我遇到了两个问题:
我认为Rx可以在这方面提供帮助,但我没有看到明显的组合使这成为可能.因此,非常感谢任何建议.
Tys*_*son 10
Rx确实非常适合IMO的这个问题.
IObservables
由于显而易见的原因,你不能'OrderBy'(你必须首先观察整个流以保证正确的输出顺序),所以下面我的答案假设(你说过)你的2个源事件流是有序的.
这最终是一个有趣的问题.标准的Rx运算符缺少一个GroupByUntilChanged
可以轻松解决这个问题的运算符,只要它OnComplete
在观察到下一组的第一个元素时调用前一个组是可观察的.然而,查看DistinctUntilChanged
它的实现并不遵循这种模式,只有OnComplete
在源observable完成时调用(即使它知道在第一个非不同元素之后将没有更多的元素......很奇怪???).无论如何,出于这些原因,我决定反对一种GroupByUntilChanged
方法(不破坏Rx约定)而是去了一个ToEnumerableUntilChanged
.
免责声明:这是我的第一个Rx扩展,所以我希望得到有关我的选择的反馈.此外,我的一个主要问题是持有该distinctElements
名单的匿名观察者.
首先,您的应用程序代码非常简单:
public class Event
{
public DateTime Timestamp { get; set; }
}
private IObservable<Event> eventStream1;
private IObservable<Event> eventStream2;
public IObservable<IEnumerable<Event>> CombineAndGroup()
{
return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
.ToEnumerableUntilChanged(e => e.Timestamp);
}
Run Code Online (Sandbox Code Playgroud)
现在为ToEnumerableUntilChanged
实现(代码墙警告):
public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector)
{
// TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
var comparer = EqualityComparer<TKey>.Default;
return Observable.Create<IEnumerable<TSource>>(observer =>
{
var currentKey = default(TKey);
var hasCurrentKey = false;
var distinctElements = new List<TSource>();
return source.Subscribe((value =>
{
TKey elementKey;
try
{
elementKey = keySelector(value);
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
if (!hasCurrentKey)
{
hasCurrentKey = true;
currentKey = elementKey;
distinctElements.Add(value);
return;
}
bool keysMatch;
try
{
keysMatch = comparer.Equals(currentKey, elementKey);
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
if (keysMatch)
{
distinctElements.Add(value);
return;
}
observer.OnNext( distinctElements);
distinctElements.Clear();
distinctElements.Add(value);
currentKey = elementKey;
}), observer.OnError, () =>
{
if (distinctElements.Count > 0)
observer.OnNext(distinctElements);
observer.OnCompleted();
});
});
}
Run Code Online (Sandbox Code Playgroud)