使用反应式扩展(rx)枚举及时发生的事件

hal*_*ton 4 .net c# system.reactive

 public interface Event
 {
      Guid identifier;
      Timestamp ts;
 }
Run Code Online (Sandbox Code Playgroud)

我们正在考虑使用Reactive Extensions来重写我的金融公司的问题.

前提是我们通过Guid(股票代码+嵌入其中的唯一性熵),时间戳和值字段来识别事件.它们的速度很快,我们不能在X秒(10秒)后"至少"之前对这些物体采取行动,之后我们必须对它们采取行动,然后将它们从系统中移除.

想想它就像两个窗口,一个"10秒"的初始窗口(例如T0到T10),我们在那里识别所有独特的事件(基本上是guid组),然后我们查看下一个"10秒","辅助窗口"(T10-T20),以确保我们实施"至少"10秒的政策.从"初始窗口",我们删除所有事件(因为我们已经考虑了它们),然后从"辅助窗口"中删除"初始窗口"中发生的事件.我们继续移动10秒推拉窗,所以现在我们正在看窗口T20-T30,重复并冲洗.

我怎么能在Rx中实现它,因为它似乎是要走的路.

yam*_*men 5

如果您可以依赖服务器时钟和消息中的时间戳(也就是说,我们处于'现实生活'模式),并且您在滑动10秒延迟而不是跳跃10秒窗口之后,那么您可以将事件延迟10秒:

var events = new Subject<Event>();  
var delayedEvents = events.Delay(TimeSpan.FromSeconds(10));
Run Code Online (Sandbox Code Playgroud)

检查唯一事件等只是将它们添加到某种类型的集合中:

var guidSet = new HashSet<Guid>();  
delayedEvents.Do(e => guidSet.Add(e.identifier));
Run Code Online (Sandbox Code Playgroud)

如果您遇到问题,您必须等待10秒然后立即处理最后10秒,那么您只需要缓冲10秒钟:

var bufferedEvents = events.Buffer(TimeSpan.FromSeconds(10));
bufferedEvents.Do(es => { foreach (var e in es) guidSet.Add(e.identifier); });
Run Code Online (Sandbox Code Playgroud)

我没有展示滑动10秒窗口的例子,因为我无法想象这就是你想要的(事件被多次处理).


现在我们认真了.假设你不想依赖墙上时间,而是希望利用事件中的时间来驱动你的逻辑.假设事件被重新定义为:

 public class Event
 {
      public Guid identifier;
      public DateTime ts;
 }
Run Code Online (Sandbox Code Playgroud)

创建历史调度程序并从原始调度程序中提取计划的事件:

var scheduler = new HistoricalScheduler();
var driveSchedule = events.Subscribe(e => scheduler.AdvanceTo(e.ts));   
var target = events.SelectMany(e => Observable.Timer(e.ts, scheduler).Select(_ => e));
Run Code Online (Sandbox Code Playgroud)

现在你可以简单地使用常规的Rx组合器来target代替event,只需通过调度程序就可以适当地触发它们,例如:

var bufferedEvents = target.Buffer(TimeSpan.FromSeconds(10), scheduler);
Run Code Online (Sandbox Code Playgroud)

这是一个简单的测试.每隔"相隔30秒"创建一百个事件,但实时触发每秒:

var now = DateTime.Now;
var test = Enumerable.Range(0,99).Select(i =>
    Scheduler.ThreadPool.Schedule(
        TimeSpan.FromSeconds(i), 
        () => events.OnNext(new Event() { 
            identifier = Guid.NewGuid(), 
            ts = now.AddSeconds(i * 30) 
        })
    )
).ToList();
Run Code Online (Sandbox Code Playgroud)

订阅并请求60秒的缓冲事件 - 实际上每2'真实'秒(60虚拟秒)实际接收2个事件:

target.Select(e => String.Format("{0} {1}", e.identifier, e.ts.ToString()))
      .Buffer(TimeSpan.FromSeconds(60), scheduler)
      .Select(es => String.Join(" - ", es))
      .DumpLive();
Run Code Online (Sandbox Code Playgroud)