了解可流动背压 rxjava2

FRR*_*FRR 2 rx-java2

我把这个虚拟例子放在一起,试图backpressure更好地理解:

Flowable.range(1, 100).onBackpressureDrop()
                      .subscribeOn(Schedulers.io())
                      .observeOn(AndroidSchedulers.mainThread())
                      .subscribeWith(object : DisposableSubscriber<Int>() {
                        override fun onStart() {
                          request(1)
                        }

                        override fun onComplete() {
                          Log.d(this@MainActivity::class.java.simpleName, "onComplete")
                        }

                        override fun onNext(t: Int?) {
                          Log.d(this@MainActivity::class.java.simpleName, t.toString())
                          Thread.sleep(1000)
                          request(1)
                        }

                        override fun onError(t: Throwable?) { //handle error}
                      })
Run Code Online (Sandbox Code Playgroud)

我有一个非常慢的Subscriber消耗数据的非常快的Flowable. 我正在指示 Flowable 到onBackPressureDrop(). 尽管如此,我的输出看起来像这样(从 1 到 100)

07-16 23:07:21.097 22389-22389 D: 1
07-16 23:07:22.100 22389-22389 D: 2
07-16 23:07:23.102 22389-22389 D: 3
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.105 22389-22389 D: 99
07-16 23:07:25.105 22389-22389 D: 100
07-16 23:07:25.107 22389-22389 D: onComplete
Run Code Online (Sandbox Code Playgroud)

我期待缺少元素,因为订阅者非常慢,但事实并非如此,从 1 到 100 的所有数字都打印到控制台,每秒一个。

接下来,我尝试一次请求所有值。所以我取代request(1)onStartrequest(Long.MAX_VALUE)和删除request(1)onNext通话。但它仍然打印数字 1 到 100,没有缺少元素。

所以我想知道如何为慢速订阅者模拟订阅者丢失事件?如何使背压异常发生?

谢谢

aka*_*okd 6

observeOn默认内部缓冲区大小为 128,这就是为什么您看不到元素被丢弃的原因,因为它可以简单地缓冲您正在生成的所有 100 个元素。您可以将缓冲区大小设置为 1 viaobserveOn(mainThread(), false, 1)并体验下降。