Rob*_*ner 5 rxjs subject-observer angular rxjs-observables
我正在实现一个角度服务,让消费者根据他们的 id 观察各种值:
它的本质是这样的:
private subjects = new Map<number, Subject<any>>();
public subscribe(id: number, observer: any): Subscription {
// try getting subject for this id (or undefined if it does not yet exist)
let subj = this.subjects.get(id);
// create subject if it does not yet exist
if (!subj) {
subj = new Subject<any>();
this.subjects.set(id, subj);
}
// subscribe observer
const subscription = subj.subscribe(observer);
// set up teardown logic (gets called when subscription is unsubscribed)
subscription.add(() => {
// remove subject from the map, if this was the last subscription
if (subj.observers.length == 0) {
this.subjects.delete(id);
}
});
// return subscription
return subscription;
}
Run Code Online (Sandbox Code Playgroud)
上面的方法工作正常,但 API 使用起来有点麻烦(在消费者中,我需要手动跟踪所有订阅并确保正确取消订阅)。
我更喜欢有一个返回Observable这样的方法:
public subscribe(id: number): Observable<any> {
// TODO: Return an observable for this id and make sure that
// its corresponding subject is in the map iff at least one of the observables
// for this id has at least one subscription.
return ...;
}
Run Code Online (Sandbox Code Playgroud)
因为这将允许我使用管道直接从组件模板订阅我需要的值async,其中 Angular 将负责取消订阅观察者。
但我不太清楚如何实现逻辑以在不再使用时删除未使用的Subjects 。Map有没有好的方法可以做到这一点?
我想你可以尝试这样的事情:
function subscribe(id: number): Observable<any> {
/* ... */
return sbj
.pipe(
finalize(() => {
if (subj.observers.length == 0) {
this.subjects.delete(id);
}
})
);
}
Run Code Online (Sandbox Code Playgroud)
这样,您还可以将异步管道与AnonymousSubjectby 返回Subject.lift(作为 的结果调用Subject.pipe())一起使用。AnonymousSubject确保观察者(例如来自模板的观察者)将被添加到“AnonymousSubject's parent 主题”的列表中。
finalize()Subject当源(例如)取消订阅时调用。这可能发生在组件被销毁时,或者发生complete/error事件时,其中还包括完成时的情况Subject。当aSubject完成时,它将向其所有订阅者发送完整通知,这意味着观察者最终将自动从aSubject的观察者列表中删除。
show1 = true;
show12 = true;
show2 = true;
v1$: Observable<any>;
v12$: Observable<any>;
v2$: Observable<any>;
constructor(public valueService: ValueService) {
}
async ngOnInit() {
await this.sleep(2000);
// const s11 = this.valueService.subscribe(1, v => this.v1 = v);
this.v1$ = this.valueService.subscribe(1);
await this.sleep(2000);
// const s21 = this.valueService.subscribe(2, v => this.v2 = v);
this.v2$ = this.valueService.subscribe(2);
await this.sleep(2000);
// const s12 = this.valueService.subscribe(1, () => {});
this.v12$ = this.valueService.subscribe(1);
await this.sleep(2000);
// s12.unsubscribe();
this.show12 = false
await this.sleep(2000);
// s11.unsubscribe();
this.show1 = false;
await this.sleep(2000);
// s21.unsubscribe();
this.show2 = false
}
Run Code Online (Sandbox Code Playgroud)
<div *ngIf="show1">
v1: {{ v1$ | async }}
</div>
<div *ngIf="show12">
v12: {{ v12$ | async }}
</div>
<div *ngIf="show2">
v2: {{ v2$ | async }}
</div>
Run Code Online (Sandbox Code Playgroud)
public subscribe(id: number): Observable<any> {
let subj = this.subjects.get(id);
if (!subj) {
subj = new Subject<any>();
this.subjects.set(id, subj);
}
return subj.pipe(
finalize(() => {
if (subj.observers.length === 1) {
this.subjects.delete(id);
}
})
)
}
Run Code Online (Sandbox Code Playgroud)
正如 @ggradnig 提到的,检查应该是subj.observers.length === 1,因为finalize()至少在 RxJs 中6.5.x,在任何其他取消订阅发生之前运行其回调。