Fuz*_*zzY 5 java memory-leaks reactive-programming rx-java reactivex
根据以下文档groupBy:
注意:A
GroupedObservable将缓存要发出的项目,直到被订阅为止。因此,为了避免内存泄漏,您不应简单地忽略与您无关的那些GroupedObservable。相反,您可以向他们发送信号,告知他们可以通过take(int)(0)对它们应用类似的运算符来丢弃其缓冲区。
有一个RxJava教程说:
在内部,每个Rx运算符都会执行3件事
- 它订阅源并观察值。
- 它根据操作员的目的转换观察到的序列。
- 通过调用onNext,onError和onCompleted,它将修改后的序列推入自己的订阅者。
让我们看一下下面的代码块,该代码块仅从中提取偶数range(0, 10):
Observable.range(0, 10)
.groupBy(i -> i % 2)
.filter(g -> g.getKey() % 2 == 0)
.flatMap(g -> g)
.subscribe(System.out::println, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
我的问题是:
这是否意味着filter运营商已经暗示要对由其产生的每个组进行订购,groupBy或者仅仅是对一个组进行订购Observable<GroupedObservable>?
在这种情况下会不会发生内存泄漏?如果是这样的话,
如何正确丢弃这些组?替换filter为自定义变量,take(0)后跟一个return Observable.empty()?您可能会问为什么我不直接返回take(0):这是因为filter并不一定要紧随其后groupBy,而是可以在链中的任何地方,并且涉及更复杂的条件。
除了内存泄漏外,由于内部请求协调问题,当前的实现可能最终会完全停止。
请注意,使用take(0),可以一直重新创建组。我会改用ignoreElements丢弃值的标签,没有项目到达,flatMap并且不会一直重新创建组本身。
g您的怀疑是正确的,因为要正确处理分组可观察量,必须订阅每个内部可观察量 ( )。filter与订阅外部可观察值一样,这只是一个坏主意。只需在flatMap使用中执行您需要的操作ignoreElements即可过滤掉不需要的组。
Observable.range(0, 10)
.groupBy(i -> i % 2)
.flatMap(g -> {
if (g.getKey() % 2 == 0)
return g;
else
return g.ignoreElements();
})
.subscribe(System.out::println, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
781 次 |
| 最近记录: |