use*_*889 115 java android rx-java
我一直在寻找新的rx java 2,我不太确定我理解了这个想法backpressure......
我知道我们有Observable没有backpressure支持,Flowable而且有它.
因此,基于例如,可以说我有flowable有interval:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Run Code Online (Sandbox Code Playgroud)
这将在大约128个值之后崩溃,这很明显我消耗的速度比获取项目慢.
但后来我们也一样 Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Run Code Online (Sandbox Code Playgroud)
这根本不会崩溃,即使我延迟消耗它仍然有效.为了让Flowable工作让我说我放onBackpressureDrop操作员,崩溃已经消失,但并非所有值都被发出.
所以我目前无法找到答案的基本问题是我为什么要关心backpressure何时可以使用普通Observable仍然可以在不管理的情况下获得所有值buffer?或者从另一方面来看,有什么优势backpressure可以帮助我管理和处理消费?
aka*_*okd 111
在实践中表现出的背压是有界缓冲区,Flowable.observeOn有128个元素的缓冲区,它们可以像下游一样快速地消耗掉它.您可以单独增加此缓冲区大小以处理突发源,并且所有背压管理实践仍适用于1.x. Observable.observeOn有一个无限制的缓冲区,不断收集元素,你的应用程序可能会耗尽内存.
您可以使用Observable例如:
您可以使用Flowable例如:
j2e*_*nue 94
Backpressure是指您的observable(发布者)创建的事件多于订阅者可以处理的事件.因此,您可以让订阅者丢失事件,或者您可以获得一个巨大的事件队列,这些事件最终会导致内存不足. Flowable考虑背压. Observable才不是.而已.
它让我想起了一个漏斗,当它有太多的液体溢出时.Flowable可以帮助我们避免这种情况发生:
具有巨大的背压:
但是使用可流动的,背压要小得多:
Rxjava2有一些背压策略,你可以根据你的用例使用.通过策略我的意思是Rxjava2提供了一种处理由于溢出(背压)而无法处理的对象的方法.
这是策略. 我不会详细介绍它们,但是例如,如果您不想担心溢出的项目,可以使用这样的放置策略:
observable.toFlowable(BackpressureStrategy.DROP)
据我所知,队列上应该有128个项目限制,之后可能会出现溢出(背压).即使它不是128接近那个数字.希望这有助于某人.
如果你需要从128更改缓冲区大小,它看起来像这样可以完成(但看看任何内存约束:
myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.
Run Code Online (Sandbox Code Playgroud)
Ego*_*gor 14
在Flowable没有背压处理的情况下发出128个值后崩溃的事实并不意味着它在经过128个值之后总会崩溃:有时它会在10之后崩溃,有时它根本不会崩溃.我相信当你尝试这个例子时会发生这种情况Observable- 碰巧没有背压,所以你的代码正常工作,下次可能没有.RxJava 2的不同之处在于,Observables中没有背压概念,也无法处理它.如果您正在设计可能需要明确背压处理的反应性序列 - 那么这Flowable是您的最佳选择.
| 归档时间: |
|
| 查看次数: |
37612 次 |
| 最近记录: |