处理项目时缓冲

Pet*_*ons 4 .net c# reactive-programming system.reactive

我有一个定期发射的事件.我们假设处理事件大约需要1秒.我没有等待每个接收到的事件1,而是想累积事件直到最后一次处理完成.处理完成后,我想处理上次处理过程中收到的事件数据:

e1   e2   e3                                                            e4   e5   e6                 e7                                              events happening   
---------------------------------------------------------------------------------------------------------------------------------------------------> time
                         1s                      2s                     3s                       4s                       5s                      6s
p(e1)                    p(e2, e3)                                      p(e4)                    p(e5, e6)                p(e7)
[-----------------------][-----------------------]                      [-----------------------][-----------------------][-----------------------]  processing of items                        

In above example, processing start as soon as e1 happens. While the processing takes places 2 more events have arrived. They should be stored so when p(e1) - which means the processing of e1 - 
is finished the processing of the events e2 and e3 takes place. 

This proces is similar to a rolling build: a changeset is checked in, the buildserver starts building and once the build is finished all changesets that have been 
checked in during the build will then be processed.
Run Code Online (Sandbox Code Playgroud)

我应该如何使用Rx?

我已经尝试过将Buffer与开启和关闭选择器结合使用,但我无法正确使用它.任何例子或方向都表示赞赏!

我们假设a Subject<int>作为输入流.

我尝试过这样的事情,但我完全迷失了.

var observer1 = input
.Buffer(bc.Where(open => open), _ => bc.Where(open => !open))
.Subscribe(ev =>
{
    bc.OnNext(true);
    String.Format("Processing items {0}.", string.Join(", ", ev.Select(e => e.ToString())).Dump());
    Thread.Sleep(300);
    bc.OnNext(false);
});
Run Code Online (Sandbox Code Playgroud)

Jam*_*rld 6

这是非繁琐的.幸运的是,@ DaveSexton已经完成了所有艰苦的工作.你想BufferIntrospective从Rxx库.在这里查看来源.

这很难的原因是因为IObserver<T>没有内置的方法来发出反压信号 - 除了阻止OnXXX调用的微妙之处.Observable需要注意Observer,你需要引入并发来管理缓冲.

另请注意,如果您有多个订阅者,他们将获得不同的数据,因为他们收到的数据取决于源事件率和他们的消费率.

另一种方法是将所有事件添加到OnNext处理程序中的线程安全队列中,并且有一个单独的任务在循环中清空队列.BufferIntrospective虽然可能更干净.

有一点玩,这个玩具实现似乎工作.但是Rxx会更强大,所以这只是教学上的真实,以显示涉及到什么类型的东西.关键是通过调度程序引入并发.

public static IObservable<IList<TSource>> BufferIntrospective<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler = null)
{
    scheduler = scheduler ?? Scheduler.Default;
    return Observable.Create<IList<TSource>>(o => {
        Subject<Unit> feedback = new Subject<Unit>();
        var sourcePub = source.Publish().RefCount();
        var sub = sourcePub.Buffer(
            () => feedback).ObserveOn(scheduler).Subscribe(@event =>
            {                
                o.OnNext(@event);
                feedback.OnNext(Unit.Default);
            },
            o.OnError,
            o.OnCompleted);
        var start = sourcePub.Take(1).Subscribe(_ => feedback.OnNext(Unit.Default));
        return new CompositeDisposable(sub, start);
    });        
}
Run Code Online (Sandbox Code Playgroud)

此示例代码显示了使用情况以及两个不同节奏的订阅者如何获得不同的事件缓冲,一个接收批次为5,其他批次为10.

我使用LINQPadDump轻松显示每个缓冲区的内容.

var xs = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(30);

var buffered = xs.BufferIntrospective();

buffered.Subscribe(x => {
    x.Dump();
    Task.Delay(TimeSpan.FromSeconds(1)).Wait();
});

buffered.Subscribe(x => {
    x.Dump();
    Task.Delay(TimeSpan.FromSeconds(2)).Wait();
});
Run Code Online (Sandbox Code Playgroud)

  • @Expecho的"BufferIntrospective"James链接的版本有[一个问题](http://rxx.codeplex.com/workitem/23869),它实际上是异步订阅源流,因此当它与热一起使用时来源,如果您不知道这一点,可能会丢失一些初始项目.如果您使用[最新版本],则不应该出现此问题(https://rxx.codeplex.com/SourceControl/latest#Main/Source/Rxx.Linq-Net451/System/Reactive/Linq/Observable2%20-% 20Introspection.cs) (2认同)