如何在最后一个观察者取消订阅时阻止AsyncSubject完成

Max*_*kyi 3 javascript rxjs

AsyncSubject成为观察到时,来自对象的最后一个主题观察者取消订阅.这是引用:

完成后,就完成了.主题在取消订阅,完成或错误后不能重复使用.

这是演示:

const ofObservable = Rx.Observable.of(1, 2, 3);
const subject = new Rx.AsyncSubject();

ofObservable.subscribe(subject);

subject.subscribe((v) => {
    console.log(v);
});

subject.unsubscribe((v) => {
    console.log(v);
});

// here I'll get the error "object unsubscribed"
subject.subscribe((v) => {
    console.log(v);
});
Run Code Online (Sandbox Code Playgroud)

如何防止主题完成?

有一个share运营商:

在RxJS 5中,操作员share()创建一个热的,refCounted observable,可以在失败时重试,或在成功时重复.由于受试者在错误,完成或以其他方式取消订阅后无法重复使用,因此share() 操作员将回收死亡的受试者,以便重新订阅所得到的观察者.

这就是我要找的东西.但share创造了一个主题,我需要AsyncSubject.

car*_*ant 11

问题出在这一行:

subject.unsubscribe((v) => {
    console.log(v);
});
Run Code Online (Sandbox Code Playgroud)

Subject 实施ISubscription ; 这意味着它有一个unsubscribe方法和一个closed属性.其实施unsubscribe如下:

unsubscribe() {
  this.isStopped = true;
  this.closed = true;
  this.observers = null;
}
Run Code Online (Sandbox Code Playgroud)

这有点残酷.从本质上讲,它会切断与主题的任何订阅者的所有通信,而不会取消订阅.同样,它也不会取消订阅主题本身,也不会取消订阅可能发生的任何观察.(它还将主题标记为已关闭/已停止,这是导致错误的原因.)

鉴于它没有实际执行任何取消订阅,它应该如何使用尚不清楚.这个测试的描述:

it('should disallow new subscriber once subject has been disposed', () => {
Run Code Online (Sandbox Code Playgroud)

表明它可能是某种来自RxJS 4的宿醉 - 其中取消订阅被称为处置.

不管它存在的原因是什么,我建议永远不要叫它.举个例子,看看这个片段:

const source = Rx.Observable
  .interval(200)
  .take(5)
  .do(value => console.log(`source: ${value}`));

const subject = new Rx.Subject();
source.subscribe(subject);

const subscription = subject
  .switchMap(() => Rx.Observable
    .interval(200)
    .take(5)
    .delay(500))
  .subscribe(value => console.log(`subscription: ${value}`));
Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

它将主题订阅到源可观察对象,然后订阅由主题组成的可观察对象.

如果unsubscribe对这个主题进行调用,则会出现一些问题:

  • 主体对源的订阅没有取消订阅,当源尝试调用主题的next方法时会产生错误; 和
  • 对主题组成的观察者的订阅没有取消订阅,因此在通话后的interval可观察量中switchMap不断发出unsubscribe.

试试看:

const source = Rx.Observable
  .interval(200)
  .take(5)
  .do(value => console.log(`source: ${value}`));

const subject = new Rx.Subject();
source.subscribe(subject);

const subscription = subject
  .switchMap(() => Rx.Observable
    .interval(200)
    .take(5)
    .delay(500))
  .subscribe(value => console.log(`subscription: ${value}`));

setTimeout(() => {
  console.log("subject.unsubscribe()");
  subject.unsubscribe();
}, 700);
Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

这些似乎都不是理想的行为,所以要求unsubscribea Subject是可以避免的.

相反,您的代码段中的代码应使用调用Subscription返回的内容取消订阅subscribe:

const subscription = subject.subscribe((v) => {
  console.log(v);
});
subscription.unsubscribe();
Run Code Online (Sandbox Code Playgroud)

继写这个答案,我发现了以下评论来自本·莱什,与我的理论,它涉及到处置对象的配合:

如果您希望主题next在完成有用之后大声而愤怒地出错,则可以unsubscribe直接调用主题实例本身.