即使没有活跃订阅者,我使用 shareReplay() 的 Observable 会永远运行吗?

Tim*_*Tim 4 observable rxjs

考虑这个Observable

myObs$ = interval(1000).pipe(shareReplay(2));
Run Code Online (Sandbox Code Playgroud)

及其用法:

myObs$.subscribe(res => console.log('[Subscriber 1]:' + res));
Run Code Online (Sandbox Code Playgroud)

如果我订阅,然后在几秒钟后取消订阅,最后在几秒钟后再次重新订阅,似乎间隔一直在运行和计数

[Subscriber 1]: 0
[Subscriber 1]: 1 
Unsubscribe, resubscribe here
[Subscriber 1]: 3
[Subscriber 1]: 4
[Subscriber 1]: 5
[Subscriber 1]: 6 
Unsubscribe, resubscribe here
[Subscriber 1]: 10
[Subscriber 1]: 11
[Subscriber 1]: 12
Run Code Online (Sandbox Code Playgroud)

我知道使用 refCount = true 时不会发生这种情况。但是当它为 false 时,这是否算作潜在的内存泄漏?如果没有,我该如何阻止它?

另外,为什么取消订阅后需要重新创建订阅?

sub = new Subscription()
sub.add(myObs.subscribe())
sub.unsubscribe()
sub.add(myObs.subscribe()) // <-- this does not work unless I recreate a new Subscription()
Run Code Online (Sandbox Code Playgroud)

use*_*er1 5

使用shareReplywithoutrefCount可能会导致内存泄漏,因为在每个人都取消订阅后,运算符不会自动关闭流。您可以使用takeUntil运算符来完成一个长期存在的流,该运算符接受另一个流,并且与Subject.

const sub = new Subject();

const stream$ = interval(1000).pipe(
  takeUntil(sub),
  shareReplay(1)
);

setTimeout(() => stream$.subscribe(console.log), 4000);
setTimeout(() => sub.next(), 6000);
setTimeout(() => stream$.subscribe(undefined, undefined, () => console.log('completed')), 8000); // this will log completed because the stream$ has successfully finished
Run Code Online (Sandbox Code Playgroud)

完成流的另一种方法是使用takeWhile,它也非常方便,并且它使用谓词函数。

如果您处置订阅,它会被标记为已关闭,并且您需要创建一个新订阅,并且无法再次重复使用。

看一下这一行,其中 add 方法内部有一个检查,如果 close 为 true,则仅对提供的​​订阅执行拆卸。