我有一个定期发射的事件.我们假设处理事件大约需要1秒.我没有等待每个接收到的事件1,而是想累积事件直到最后一次处理完成.处理完成后,我想处理上次处理过程中收到的事件数据:
e1 e2 e3 e4 e5 e6 e7 events happening
---------------------------------------------------------------------------------------------------------------------------------------------------> time
1s 2s 3s 4s 5s 6s
p(e1) p(e2, e3) p(e4) p(e5, e6) p(e7)
[-----------------------][-----------------------] [-----------------------][-----------------------][-----------------------] processing of items
In above example, processing start as soon as e1 happens. While the processing takes places 2 more events have arrived. They should be stored so when p(e1) - which means the processing of e1 -
is finished the processing of the events e2 and e3 takes place.
This proces is …Run Code Online (Sandbox Code Playgroud) 在为这里描述的问题找不到合适的解决方案后,我决定实施这个。
然而,我缺乏使用 monad 的经验,而像 Lift(..) 这样的东西对我来说仍然有点神奇......
我打开它的目的是让那些在 rxjava 之上实现了一些自定义东西的人可以就如何实现这一点给我建议。
现在这是什么,这是界面。
我想这对你们大多数人来说都是不言自明的,但为了确保我会举一个例子。
想象一下我们有一个订阅者(消费者)实际上对数据库进行持久化,很明显,如果你给它 1 或 1000 个对象来持久化,差异不会是 1000 的因数,它将是 10 的因数或更少,这意味着它是一个消费者可以通过加载......所以一次推送一个项目是愚蠢的,而你可以一次坚持很多,另一方面,等待一批 N 个元素填满直到你坚持(一个第二个你可能会得到 1000 个元素,你可能没有得到,所以假设我们不知道传入数据的频率)...
所以我们现在拥有的是Observable.batch()它会要求一些 N 大小的批次,我们经常会等待而不工作......另一方面,我们拥有Disruptor它完全符合我们想要的但不提供Observable......的漂亮界面Disruptor将处理单个元素,当您处理它时,它将收集所有传入的元素,下次您将获得一批由于您的消费者忙于处理最后一个值而收集的所有内容......
目前我想我将Observable.from()用来实现这个或lift()......
请分享您对此的想法,也许我不知道已经有可用的解决方案,或者我即将以错误的方式实现它......