相关疑难解决方法(0)

处理项目时缓冲

我有一个定期发射的事件.我们假设处理事件大约需要1秒.我没有等待每个接收到的事件1,而是想累积事件直到最后一次处理完成.处理完成后,我想处理上次处理过程中收到的事件数据:

e1   e2   e3                                                            e4   e5   e6                 e7                                              events happening   
---------------------------------------------------------------------------------------------------------------------------------------------------> time
                         1s                      2s                     3s                       4s                       5s                      6s
p(e1)                    p(e2, e3)                                      p(e4)                    p(e5, e6)                p(e7)
[-----------------------][-----------------------]                      [-----------------------][-----------------------][-----------------------]  processing of items                        

In above example, processing start as soon as e1 happens. While the processing takes places 2 more events have arrived. They should be stored so when p(e1) - which means the processing of e1 - 
is finished the processing of the events e2 and e3 takes place. 

This proces is …
Run Code Online (Sandbox Code Playgroud)

.net c# reactive-programming system.reactive

4
推荐指数
1
解决办法
431
查看次数

像Lmax Disruptor一样可以批量观察

那些熟悉lmax环形缓冲区( disruptor )的人都知道,该数据结构的最大优点之一就是它批准了事件,当我们有一个可以利用批处理的消费者时,系统可以自动调整负载,你抛出的事件越多越好.

我想我们无法通过Observable实现相同的效果(定位批处理功能).我已经尝试过Observable.buffer,但这是非常不同的,缓冲区将等待并且在预期的事件数量没有到达时不会发出批处理.我们想要的是完全不同的.

如果subriber正在等待一个批次Observable<Collection<Event>>,当一个项目到达流时,它会发出一个由订阅者处理的单个元素批处理,而正在处理其他元素到达并收集到下一个批处理中,一旦订阅者完成执行它获得下一批与自上次处理开始以来已经到达的事件数...

因此,如果我们的订户足够快以一次处理一个事件,它将这样做,如果负载变高,它仍将具有相同的处理频率,但每次更多事件(从而解决背压问题)...不同缓冲区将坚持并等待批次填满.

有什么建议?或者我应该使用环形缓冲区?

java reactive-programming observable disruptor-pattern rx-java

2
推荐指数
1
解决办法
599
查看次数