仅使用 onBackPressureLatest() 消耗最新项目

Joe*_*ard 3 java rx-java2

我有一个定期发出物品的生产者和一个有时非常慢的消费者。重要的是,消费者只使用最近的商品。我认为 onBackPressureLatest() 是这个问题的完美解决方案。于是我写了如下测试代码:

PublishProcessor<Integer> source = PublishProcessor.create();
source
        .onBackpressureLatest()
        .observeOn(Schedulers.from(Executors.newCachedThreadPool()))
        .subscribe(i -> {
            System.out.println("Consume: " + i);
            Thread.sleep(100);
        });

for (int i = 0; i < 10; i++) {
    System.out.println("Produce: " + i);
    source.onNext(i);
}
Run Code Online (Sandbox Code Playgroud)

我预计它会记录类似以下内容:

Produce: 0
...
Produce: 9
Consume: 0
Consume: 9
Run Code Online (Sandbox Code Playgroud)

相反,我得到

Produce: 0
...
Produce: 9
Consume: 0
Consume: 1
...
Consume: 9
Run Code Online (Sandbox Code Playgroud)

onBackPressureLatest() 和 onBackPressureDrop() 都没有任何效果。只有 onBackPressureBuffer(i) 会引发异常。

我使用 rxjava 2.1.9。你知道问题/我的误解可能是什么吗?

aka*_*okd 6

observeOn有一个内部缓冲区(默认 128 个元素),可以立即轻松拾取所有源项目,因此onBackpressureLatest始终被完全消耗。

编辑:

您可以创建的最小缓冲区是 1,它应该提供所需的模式:

source.onBackpressureLatest()
      .observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
      .subscribe(v -> { /* ... */ });
Run Code Online (Sandbox Code Playgroud)

(前面的delay+rebatchRequest组合实际上与此等效)。