RxJS:takeUntil()角度组件的ngOnDestroy()

Jef*_*son 32 components rxjs rxjs5 angular

tl; dr:基本上我想将Angular ngOnDestroy与Rxjs takeUntil()运算符结合起来. - 那可能吗?

我有一个Angular组件,可以打开几个Rxjs订阅.当组件被销毁时,需要关闭它们.

一个简单的解决方案是:

class myComponent {

  private subscriptionA;
  private subscriptionB;
  private subscriptionC;

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    this.subscriptionA = this.serviceA.subscribe(...);
    this.subscriptionB = this.serviceB.subscribe(...);
    this.subscriptionC = this.serviceC.subscribe(...);
  }

  ngOnDestroy() {
    this.subscriptionA.unsubscribe();
    this.subscriptionB.unsubscribe();
    this.subscriptionC.unsubscribe();
  }

}
Run Code Online (Sandbox Code Playgroud)

这有效,但有点多余.我特别不喜欢那样 - 这unsubscribe()是其他地方,所以你必须记住这些是相互关联的. - 组件状态受订阅污染.

我更喜欢使用takeUntil()运算符或类似的东西,使它看起来像这样:

class myComponent {

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    const destroy = Observable.fromEvent(???).first();
    this.subscriptionA = this.serviceA.subscribe(...).takeUntil(destroy);
    this.subscriptionB = this.serviceB.subscribe(...).takeUntil(destroy);
    this.subscriptionC = this.serviceC.subscribe(...).takeUntil(destroy);
  }

}
Run Code Online (Sandbox Code Playgroud)

是否存在可以让我使用的破坏事件或类似的东西takeUntil()或其他方式来简化组件架构?我意识到我可以在构造函数中创建一个事件,或者在内部触发ngOnDestroy()但是最终不会使事情变得简单易读.

ols*_*lsn 48

你可以利用a ReplaySubject:

class myComponent {
  private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    this.serviceA
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
    this.serviceB
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
    this.serviceC
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
  }

  ngOnDestroy() {
    this.destroyed$.next(true);
    this.destroyed$.complete();
  }
}
Run Code Online (Sandbox Code Playgroud)

  • @EricJ即使你在调用了'ngOnDestroy`之后尝试使用任何observable,`replaySubject`也会帮助保持组件处于被破坏状态.任何延迟订阅都会立即触发`replaySubject`中的重放值并完成. (7认同)
  • 在某种程度上,这不是我想要的——我想避免在组件中创建额外的状态工件(“destroyed$”)并从“ngOnDestroy”触发它。但经过更多研究后我逐渐意识到,没有任何语法糖可以解决这个问题。这绝对是一个比存储所有订阅更好的解决方案。谢谢! (5认同)
  • 在角度团队中已经讨论过如何使组件中的rxjs可以轻松访问destroy事件,但据我所知还没有实现. (2认同)
  • 我会在这里考虑一个`new ReplaySubject(1)`.这样你的组件将保持在被破坏的状态,你确信一切都已完成.除此之外,很好的答案:) (2认同)
  • @Dorus - 此处重播主题与常规主题相比的价值是多少。只要主题完成了,为什么还需要重播功能? (2认同)
  • @Isac 你上面的评论充分解释了这一点,不是吗? (2认同)

Ren*_*ger 12

使用componentDestroyed()npm包中的函数ng2-rx-componentdestroyed是目前使用takeUntil最简单的方法:

@Component({
  selector: 'foo',
  templateUrl: './foo.component.html'
})
export class FooComponent implements OnInit, OnDestroy {
  ngOnInit() {
    Observable.interval(1000)
      .takeUntil(componentDestroyed(this)) // <--- magic is here!
      .subscribe(console.log);
  }

  ngOnDestroy() {}
}
Run Code Online (Sandbox Code Playgroud)

这是一个componentDestroyed()直接包含在您的代码中的版本:

// Based on https://www.npmjs.com/package/ng2-rx-componentdestroyed
import { OnDestroy } from '@angular/core';
import { ReplaySubject } from 'rxjs/ReplaySubject';

export function componentDestroyed(component: OnDestroy) {
  const oldNgOnDestroy = component.ngOnDestroy;
  const destroyed$ = new ReplaySubject<void>(1);
  component.ngOnDestroy = () => {
    oldNgOnDestroy.apply(component);
    destroyed$.next(undefined);
    destroyed$.complete();
  };
  return destroyed$;
}
Run Code Online (Sandbox Code Playgroud)

  • 这种方法的问题是您现在必须扩展一些基类 (3认同)
  • 它是 ReplaySubject 的任何原因吗?- 我通常使用普通主题 (3认同)

mar*_*tin 10

那么,这取决于您关闭订阅的意思.基本上有两种方法可以做到这一点:

  1. 使用完成链的运算符(例如takeWhile()).
  2. 取消订阅源Observable.

很高兴知道这两者并不相同.

例如,当takeWhile()您使用时,您将使操作员发送complete通知,该通知将传播给您的观察者.所以如果你定义:

...
.subscribe(..., ..., () => doWhatever());
Run Code Online (Sandbox Code Playgroud)

然后,当你用例如完成链.takeWhile()doWhatever()函数将被调用.

例如,它可能如下所示:

const Observable = Rx.Observable;
const Subject = Rx.Subject;

let source = Observable.timer(0, 1000);
let subject = new Subject();

source.takeUntil(subject).subscribe(null, null, () => console.log('complete 1'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 2'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 3'));

setTimeout(() => {
  subject.next();
}, 3000);
Run Code Online (Sandbox Code Playgroud)

3秒后,将调用所有完整的回调.

另一方面,当您取消订阅时,您说您不再对源Observable生成的项目感兴趣.然而,这并不意味着源必须完成.你根本不在乎.

这意味着您可以Subscription.subscribe(...)呼叫中收集所有s 并立即取消订阅所有这些:

let subscriptions = new Rx.Subscription();
let source = Observable.timer(0, 1000);

subscriptions.add(source.subscribe(null, null, () => console.log('complete 1')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 2')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 3')));

setTimeout(() => {
  subscriptions.unsubscribe();
}, 3000);
Run Code Online (Sandbox Code Playgroud)

现在3s延迟后,没有任何内容会打印到控制台,因为我们取消订阅并且没有调用完整的回调.

所以你想要使用的取决于你和你的用例.请注意,取消订阅与完成不同,即使我猜你的情况并不重要.


el-*_*avo 10

Angular 16 提供了一个新的 takeUntilDestroyed 函数,可以在构造函数中像这样使用

import { Component } from "@angular/core";
import { takeUntilDestroyed } from "@angular/core/rxjs-interop";

@Component({
  selector: "my-component",
  templateUrl: "./my-component.html",
  styleUrls: ["./my-component.scss"]
})
export class MyComponent {

  constructor(private http: HttpClient) {
     this.http.get('/api')
       .pipe(takeUntilDestroyed())
       .subscribe();
  }
}
Run Code Online (Sandbox Code Playgroud)

注意如果您尝试在构造函数外部执行相同的操作,您可能会看到此错误takeUntilDestroyed() can only be used within an injection context such as a constructor将此更新修复为以下内容

import { Component, DestroyRef, OnInit, inject } from "@angular/core";
import { takeUntilDestroyed } from "@angular/core/rxjs-interop";

@Component({
  selector: "my-component",
  templateUrl: "./my-component.html",
  styleUrls: ["./my-component.scss"]
})
export class MyComponent implements OnInit {
  destroyedRef = inject(DestroyRef);

  ngOnInit(): void {
     this.http.get('/api')
       .pipe(takeUntilDestroyed(this.destroyedRef))
       .subscribe();
  }
}
Run Code Online (Sandbox Code Playgroud)


Kev*_*ker 6

请在 TakeUntil 中使用多态性(2022 年 4 月 13 日)

\n

如果您正在编写protected destroy$ = new Subject<void>();自己制作的每个组件,那么您应该问自己:“为什么我不遵循DRY(Don\xe2\x80\x99t Repeat Yourself)原则?”

\n

要遵循 DRY 原则,请创建一个处理销毁信号的抽象基础组件(抽象类不能直接实例化)。

\n
@Component({ template: \'\' })\nexport abstract class BaseComponent extends Subscribable {\n  // Don\'t let the outside world trigger this destroy signal.\n  // It\'s only meant to be trigger by the component when destroyed! \n  private _destroy = new Subject<void>();\n  public destroy$ = this._destroy as Observable<void>;\n  /** Lifecycle hook called by angular framework when extended class dies. */\n  ngOnDestroy(): void {\n    this._destroy.next();\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

制作一个方便的扩展功能来简化事情。

\n
declare module \'rxjs/internal/Observable\' {\n  interface Observable<T> {\n    dieWith(comp: BaseComponent): Observable<T>;\n  }\n}\n\nObservable.prototype.dieWith = function<T>(comp: BaseComponent): Observable<T> {\n    return this.pipe(takeUntil(comp.destroy$));\n};\n
Run Code Online (Sandbox Code Playgroud)\n

每当您需要处理订阅时,都可以扩展您的 BaseComponent。

\n
@Component({ ... })\nexport class myComponent extends BaseComponent {\n\n  constructor(\n    private serviceA: ServiceA,\n    private serviceB: ServiceB,\n    private serviceC: ServiceC\n  ) {\n    super();\n  }\n\n  ngOnInit() {\n    this.serviceA.a$.dieWith(this).subscribe(...);\n    this.serviceB.b$.dieWith(this).subscribe(...);\n    this.serviceC.c$.dieWith(this).subscribe(...);\n  }\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n

您已经像专业人士一样正式处理了 Angular Components 中的订阅。

\n

您的同事稍后会感谢您!

\n

快乐编码!

\n