rxjs Observable:处理所有订阅的取消订阅

ar0*_*968 4 javascript rxjs

我有一个返回 Observable 的方法

subFoo(id): Observable<number> {
    return new Observable<number>(observer => {
        setTimeout(() => {
            observer.next(id);
        }, 1000);
    });
}
Run Code Online (Sandbox Code Playgroud)

现在我订阅了三次,5秒后全部取消订阅:

const sub1 = subFoo(1).subscribe(result => console.log(result));
const sub2 = subFoo(2).subscribe(result => console.log(result));
const sub3 = subFoo(3).subscribe(result => console.log(result));

setTimeout(() => {
  sub1.unsubscribe();
  sub2.unsubscribe();
  sub3.unsubscribe();
}, 5000);
Run Code Online (Sandbox Code Playgroud)

我可以处理所有听众的完全取消订阅吗?

例如。(伪代码):

subFoo(id): Observable<number> {
    return new Observable<number>(observer => {

        // something like this
        observer.onAllListenerAreUnsubscribed(() => {
           console.log('All Listener Are Unsubscribed!');
        });

        setTimeout(() => {
            observer.next(id);
        }, 1000);
    });
}
Run Code Online (Sandbox Code Playgroud)

现场演示:https ://stackblitz.com/edit/angular-ayl12r

mar*_*tin 6

编辑 2022 年 7 月:自 RxJS 7.0 以来可以使用tap({ subscribe: () => ... }).

Observable 无法知道对其链的订阅。如果您想知道某人订阅了多少次,您可以自己数一下:

let subscriptions = 0;

subFoo(id): Observable<number> {
  return new Observable<number>(observer => {
    subscriptions++;
    ...
    return (() => {
      if (--subscriptions === 0) {
        // do whatever...
      }
      ...
    })
  })
})
Run Code Online (Sandbox Code Playgroud)

您还可以将“观察者端”上的所有订阅收集到单个订阅中,然后在取消订阅时添加自定义处理程序:

const subs = new Subscription();
subs.add(subFoo(1).subscribe(...));
subs.add(subFoo(2).subscribe(...));
subs.add(subFoo(3).subscribe(...));
subs.add(() => {
  // do whatever...
});

subs.unsubscribe(); // Will unsubscribe all subscriptions and then call your custom method.
Run Code Online (Sandbox Code Playgroud)