如何在Reactive Extensions中将项目缓冲到组中?

For*_*say 3 c# linq system.reactive observable observer-pattern

我有一个IObservable; 其中属性更改具有实体ID和PropertyName.我想用它来更新数据库,但是如果多个属性几乎同时发生变化,我只想对同一个实体的所有属性进行一次更新.

如果这是一个静态IEnumerable并且我使用LINQ我可以简单地使用:

MyList.GroupBy(C=>C.EntityID);
Run Code Online (Sandbox Code Playgroud)

但是,列表永远不会终止(从不调用IObserver.OnComplete).我想要做的是等待一段时间,比如1秒钟,将所有呼叫分组适当的一秒钟.

理想情况下,我会为每个EntityID设置单独的计数器,只要找到该EntityID的新属性更改,它们就会重置.

我不能使用像Throttle这样的东西,因为我想处理所有的属性更改,我只是想一次性处理它们.

Eni*_*ity 7

干得好:

MyObservable
    .Buffer(TimeSpan.FromSeconds(1.0))
    .Select(MyList =>
        MyList.GroupBy(C=>C.EntityID));
Run Code Online (Sandbox Code Playgroud)