Observable vs Flowable rxJava2

use*_*889 115 java android rx-java

我一直在寻找新的rx java 2,我不太确定我理解了这个想法backpressure......

我知道我们有Observable没有backpressure支持,Flowable而且有它.

因此,基于例如,可以说我有flowableinterval:

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });
Run Code Online (Sandbox Code Playgroud)

这将在大约128个值之后崩溃,这很明显我消耗的速度比获取项目慢.

但后来我们也一样 Observable

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });
Run Code Online (Sandbox Code Playgroud)

这根本不会崩溃,即使我延迟消耗它仍然有效.为了让Flowable工作让我说我放onBackpressureDrop操作员,崩溃已经消失,但并非所有值都被发出.

所以我目前无法找到答案的基本问题是我为什么要关心backpressure何时可以使用普通Observable仍然可以在不管理的情况下获得所有值buffer?或者从另一方面来看,有什么优势backpressure可以帮助我管理和处理消费?

aka*_*okd 111

在实践中表现出的背压是有界缓冲区,Flowable.observeOn有128个元素的缓冲区,它们可以像下游一样快速地消耗掉它.您可以单独增加此缓冲区大小以处理突发源,并且所有背压管理实践仍适用于1.x. Observable.observeOn有一个无限制的缓冲区,不断收集元素,你的应用程序可能会耗尽内存.

您可以使用Observable例如:

  • 处理GUI事件
  • 使用短序列(总共少于1000个元素)

您可以使用Flowable例如:

  • 冷和非定时源
  • 发电机像源
  • 网络和数据库访问器

  • 是的,`Maybe`、`Single` 和 `Completable` _far_ 太小了,根本不需要背压概念。生产者不可能以比消耗更快的速度发出物品,因为将永远生产或消耗 0-1 个物品。 (2认同)

j2e*_*nue 94

Backpressure是指您的observable(发布者)创建的事件多于订阅者可以处理的事件.因此,您可以让订阅者丢失事件,或者您可以获得一个巨大的事件队列,这些事件最终会导致内存不足. Flowable考虑背压. Observable才不是.而已.

它让我想起了一个漏斗,当它有太多的液体溢出时.Flowable可以帮助我们避免这种情况发生:

具有巨大的背压:

在此输入图像描述

但是使用可流动的,背压要小得多:

在此输入图像描述

Rxjava2有一些背压策略,你可以根据你的用例使用.通过策略我的意思是Rxjava2提供了一种处理由于溢出(背压)而无法处理的对象的方法.

这是策略. 我不会详细介绍它们,但是例如,如果您不想担心溢出的项目,可以使用这样的放置策略:

observable.toFlowable(BackpressureStrategy.DROP)

据我所知,队列上应该有128个项目限制,之后可能会出现溢出(背压).即使它不是128接近那个数字.希望这有助于某人.

如果你需要从128更改缓冲区大小,它看起来像这样可以完成(但看看任何内存约束:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  
Run Code Online (Sandbox Code Playgroud)

  • 这些图像在骗我。丢弃事件不会在底部得到“更多的钱”。 (2认同)

Ego*_*gor 14

Flowable没有背压处理的情况下发出128个值后崩溃的事实并不意味着它在经过128个值之后总会崩溃:有时它会在10之后崩溃,有时它根本不会崩溃.我相信当你尝试这个例子时会发生这种情况Observable- 碰巧没有背压,所以你的代码正常工作,下次可能没有.RxJava 2的不同之处在于,Observables中没有背压概念,也无法处理它.如果您正在设计可能需要明确背压处理的反应性序列 - 那么这Flowable是您的最佳选择.