RxJs操作员组可以泄漏内存吗?

MFa*_*ave 2 functional-programming reactive-programming rxjs

我试图围绕RxJs运算符的用例,groupBy我担心在某些情况下它可能会导致内存泄漏.

我熟悉传统意义上的groupBy(例如同步列表处理).我将写出一个groupBy函数来引用:

const groupBy = f => list =>
  list.reduce((grouped, item) => {
    const category = f(item);
    if (!(category in grouped)) {
      grouped[category] = [];
    }
    grouped[category].push(item);
    return grouped;
  }, {});

const oddsAndEvens = x => x % 2 === 0 ? 'EVEN' : 'ODD';

compose(
  console.log,
  groupBy(oddsAndEvens)
)([1,2,3,4,5,6,7,8])

// returns: { ODD: [ 1, 3, 5, 7 ], EVEN: [ 2, 4, 6, 8 ] }
Run Code Online (Sandbox Code Playgroud)

请注意,这在更广泛的范围内是无状态的.我假设RxJs做了类似于此的事情,在EVEN和ODD的位置会返回observables,并且它会在一些行为类似于集合的状态中跟踪状态.如果我错了,请纠正我,重点是我认为RxJs必须维护所有分组的有状态列表.

我的问题是,如果分组值的数量(本例中只是偶数和奇数)不是有限的,会发生什么?例如,一个流,它为您提供唯一标识符,以保持流的生命周期的一致性.如果你按照这个标识符进行分组,那么RxJs的groupBy运算符会不断创建越来越多的组,即使再次重新访问旧的标识符也是如此?

Zah*_*hiC 5

如果您的流是无限的,并且您的密钥选择器可以生成无限组,那么 - 是的,您有内存泄漏.

您可以为每个分组的可观察对象设置持续时间选择器.的持续时间选择为每个组和信号上的组的期满创建.

rxjs 5+:groupBy第3个参数.

rxjs 4:使用groupedByUntil运算符.

以下是无限流的示例,其中每个分组的Observable在3秒后关闭.

Rx.Observable.interval(200)
  .groupBy(
    x => Math.floor(x / 10),
    x => x,
    x$ => Rx.Observable.timer(3000).finally(() => console.log(`closing group ${x$.key}`))
  )
  .mergeMap(x$ => x$.map(x => `group ${x$.key}: ${x}`))
  .subscribe(console.log)
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.js"></script>
Run Code Online (Sandbox Code Playgroud)