groupBy,Rx中的过滤器和内存泄漏

Fuz*_*zzY 5 java memory-leaks reactive-programming rx-java reactivex

根据以下文档groupBy

注意:A GroupedObservable将缓存要发出的项目,直到被订阅为止。因此,为了避免内存泄漏,您不应简单地忽略与您无关的那些GroupedObservable。相反,您可以向他们发送信号,告知他们可以通过take(int)(0)对它们应用类似的运算符来丢弃其缓冲区。

有一个RxJava教程说:

在内部,每个Rx运算符都会执行3件事

  1. 它订阅源并观察值。
  2. 它根据操作员的目的转换观察到的序列。
  3. 通过调用onNext,onError和onCompleted,它将修改后的序列推入自己的订阅者。

让我们看一下下面的代码块,该代码块仅从中提取偶数range(0, 10)

Observable.range(0, 10)
        .groupBy(i -> i % 2)
        .filter(g -> g.getKey() % 2 == 0)
        .flatMap(g -> g)
        .subscribe(System.out::println, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

我的问题是:

  1. 这是否意味着filter运营商已经暗示要对由其产生的每个组进行订购,groupBy或者仅仅是对一个组进行订购Observable<GroupedObservable>

  2. 在这种情况下会不会发生内存泄漏?如果是这样的话,

  3. 如何正确丢弃这些组?替换filter为自定义变量,take(0)后跟一个return Observable.empty()?您可能会问为什么我不直接返回take(0):这是因为filter并不一定要紧随其后groupBy,而是可以在链中的任何地方,并且涉及更复杂的条件。

aka*_*okd 5

除了内存泄漏外,由于内部请求协调问题,当前的实现可能最终会完全停止。

请注意,使用take(0),可以一直重新创建组。我会改用ignoreElements丢弃值的标签,没有项目到达,flatMap并且不会一直重新创建组本身。


Dav*_*ten 4

g您的怀疑是正确的,因为要正确处理分组可观察量,必须订阅每个内部可观察量 ( )。filter与订阅外部可观察值一样,这只是一个坏主意。只需在flatMap使用中执行您需要的操作ignoreElements即可过滤掉不需要的组。

Observable.range(0, 10)
    .groupBy(i -> i % 2)
    .flatMap(g -> {
       if (g.getKey() % 2 == 0) 
         return g;
       else 
         return g.ignoreElements();
    })
    .subscribe(System.out::println, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)