MFa*_*ave 2 functional-programming reactive-programming rxjs
我试图围绕RxJs运算符的用例,groupBy我担心在某些情况下它可能会导致内存泄漏.
我熟悉传统意义上的groupBy(例如同步列表处理).我将写出一个groupBy函数来引用:
const groupBy = f => list =>
list.reduce((grouped, item) => {
const category = f(item);
if (!(category in grouped)) {
grouped[category] = [];
}
grouped[category].push(item);
return grouped;
}, {});
const oddsAndEvens = x => x % 2 === 0 ? 'EVEN' : 'ODD';
compose(
console.log,
groupBy(oddsAndEvens)
)([1,2,3,4,5,6,7,8])
// returns: { ODD: [ 1, 3, 5, 7 ], EVEN: [ 2, 4, 6, 8 ] }
Run Code Online (Sandbox Code Playgroud)
请注意,这在更广泛的范围内是无状态的.我假设RxJs做了类似于此的事情,在EVEN和ODD的位置会返回observables,并且它会在一些行为类似于集合的状态中跟踪状态.如果我错了,请纠正我,重点是我认为RxJs必须维护所有分组的有状态列表.
我的问题是,如果分组值的数量(本例中只是偶数和奇数)不是有限的,会发生什么?例如,一个流,它为您提供唯一标识符,以保持流的生命周期的一致性.如果你按照这个标识符进行分组,那么RxJs的groupBy运算符会不断创建越来越多的组,即使再次重新访问旧的标识符也是如此?
如果您的流是无限的,并且您的密钥选择器可以生成无限组,那么 - 是的,您有内存泄漏.
您可以为每个分组的可观察对象设置持续时间选择器.的持续时间选择为每个组和信号上的组的期满创建.
rxjs 5+:groupBy第3个参数.
rxjs 4:使用groupedByUntil运算符.
以下是无限流的示例,其中每个分组的Observable在3秒后关闭.
Rx.Observable.interval(200)
.groupBy(
x => Math.floor(x / 10),
x => x,
x$ => Rx.Observable.timer(3000).finally(() => console.log(`closing group ${x$.key}`))
)
.mergeMap(x$ => x$.map(x => `group ${x$.key}: ${x}`))
.subscribe(console.log)Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.js"></script>Run Code Online (Sandbox Code Playgroud)