禁用Flowable/Observable Buffer

Luc*_*ini 4 java reactive-programming java-8 rx-java rx-java2

我遇到的问题始终是使用combineLatest运算符获得的组合中的最后一个值.

我有2个热流道(a,b)以高频率生成事件(每100毫秒一个事件):

Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value);
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value);
Run Code Online (Sandbox Code Playgroud)

结合combineLatest,需要将近300毫秒才能完成它的工作.

Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB,        OrderBookCouple::new).observeOn(Schedulers.newThread());
combined.subscribe((bookCouple) -> {
                System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp());
                System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp());
                Thread.sleep(300);
            }
Run Code Online (Sandbox Code Playgroud)

在一次执行组合器之后,我想处理最后生成的事件组合,意思是(lastA,lastB).

组合流的默认行为是将所有事件组合缓存在其自己的缓冲区中,以便组合流可以接收非常旧的组合,并且此时间隙正在爆炸.

我应该如何更改我的代码以禁用此缓冲区并始终接收最后一个组合?

aka*_*okd 5

您可以应用onBackpressureLatest上都flowAflowB和使用的过载combineLatest这让你指定的预取量.

Flowable.combineLatest(
    Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
    a -> new OrderBookCouple((OrderBook)a[0], (OrderBook)a[1]),
    1
)
.onBackpressureLatest()
.observeOn(Schedulers.newThread(), false, 1)
Run Code Online (Sandbox Code Playgroud)

不幸的是,没有任何重载同时需要BiFunction和bufferSize,因此您必须恢复为转换数组元素.

编辑

应用秒onBackpressureLatest和限制缓冲区大小observeOn可以使您更接近所需的模式,但combineLatest不是针对此用例.您可能想要一些多样本运算符.

  • 更新了我的回答. (2认同)