标签: rxjs-observables

我需要取消订阅 Angular Observable 吗?

令人惊讶的是,很难得到关于是否以及何时应该取消订阅 Angular Observable 的直接答案。

我有以下场景:

this.subscription = this.service.makeHTTPCall(stuff).subscribe(x => {
//do something
});
Run Code Online (Sandbox Code Playgroud)

我看到的解决方案有以下几种:

  1. 不要将订阅存储为变量,这是否意味着我不必取消订阅?

    this.service.makeHTTPCall(stuff).subscribe(x => {
    //do something
    });
    
    Run Code Online (Sandbox Code Playgroud)
  2. 将订阅存储为变量并在 ngOnDestroy 中取消订阅

    ngOnDestroy() {
    if (this.subscription) { this.subscription.unsubscribe(); }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  3. 什么也不做,Angular 会为你整理所有取消订阅的内容

我知道有像 之类的第三方库ng-take-until-destroy,但假设我们没有任何第三方库,这是建议的取消订阅方法吗?

rxjs typescript angular rxjs-observables

5
推荐指数
2
解决办法
8236
查看次数

Angular 清除订阅的更好方法

有很多方法可以在一个组件中有效地处理多个订阅,我在这里有 2 种方法,想知道哪种方法更有效,为什么?

方法一:使用数组

第 1 步:创建数组

private subscriptionArray: Subscription[];
Run Code Online (Sandbox Code Playgroud)

第 2 步:向数组添加订阅

this.subscriptionArray.push(this._storeManagementHttp.createStore(newStore).subscribe(resp => {
  this._toast.success('New store created');
}));
Run Code Online (Sandbox Code Playgroud)

第 3 步:迭代每个订阅和取消订阅

this.subscriptionArray.forEach(subs => subs.unsubscribe());
Run Code Online (Sandbox Code Playgroud)

方法二

第 1 步:创建新订阅

private subscriptions = new Subscription();
Run Code Online (Sandbox Code Playgroud)

第 2 步:添加订阅

this.subscriptions.add(this._storeManagementHttp.createStore(newStore).subscribe(resp => {
  this._toast.success('New store created');
  this._router.navigate(['/store-management']);
}));
Run Code Online (Sandbox Code Playgroud)

Step3:清除订阅

this.subscriptions.unsubscribe();
Run Code Online (Sandbox Code Playgroud)

observable rxjs angular rxjs-subscriptions rxjs-observables

4
推荐指数
2
解决办法
5340
查看次数

RxJs:您可以将运算符作为参数传播到管道运算符中吗

我有两个可观察流,它们执行非常独立的映射逻辑,但最终以以下 3 个运算符结束:

  this.selection
    .pipe(
      ..Custom mapping operators
      tap(_ => this.devicesLoading = true),
      switchMap(d => this.mapService.findLocationForDevices(d)),
      map(loc => marker([loc.latitude, loc.longitude])
    )
    .subscribe(markers => this.plotMarkers(markers));
Run Code Online (Sandbox Code Playgroud)

我想将最后一个tap, switchMap, map运算符移动到一个通用函数,这样我就可以在我的两个可观察流中应用它们。

我想过这样做:

  private resolveLocationsAndConvertToMarkers = (devices: String[]) => [
    tap(_ => this.devicesLoading = true),
    switchMap((devices: string[]) => this.mapService.findLocationForDevices(devices)),
    map(loc => marker([loc.latitude, loc.longitude])
  ];
Run Code Online (Sandbox Code Playgroud)

但我不确定如何将这些运算符扩展到管道参数中,例如:#

      this.selection
        .pipe(
          // Custom mapping operators
          ... this.resolveLocationsAndConvertToMarkers
        )
        .subscribe(markers => this.plotMarkers(markers));
Run Code Online (Sandbox Code Playgroud)

这个错误there are no overloads that expect 3 or 5 arguments...

rxjs rxjs-pipeable-operators rxjs-observables

4
推荐指数
1
解决办法
849
查看次数

如何使用 shareReplay 缓存带有请求参数的 HTTP 响应

我有一个场景,比如我需要在单击文件时获取文件内容。我想用 缓存文件内容 API shareReplay(),如何实现?

fileService.getContent是一个 API 服务,它将通过以下方式获取内容params(repository_id, filePath);

问题:

  • 我应该在哪里使用shareReplay() pipe?内部 API 服务?或者我在下面写的地方。
  • 下面的代码无法按预期工作。该API会被多次触发。如何缓存 API 以便shareReplay()仅调用一次。

成分

fileOpened$ = new Subject();
...

this.fileOpened$.pipe(
    switchMap(file => this.fileService.getContent(this.repository_id, file.filePath)),
       shareReplay(1)
);
Run Code Online (Sandbox Code Playgroud)

服务

getContent(repoId: string, path: string): Observable<string> {
    return this.http.get<string>(
        `/api/xxx/${repoId}/files/${decodeURIComponent(path)}`,
        {
            responseType: 'text' as 'json'
        });
}
Run Code Online (Sandbox Code Playgroud)

angular rxjs6 rxjs-observables

4
推荐指数
1
解决办法
4662
查看次数

如何在计时器中继续catchError(rxjs)

我有一个服务负责使用计时器每 x 秒执行一次 httpClient.get 。我需要这个计时器在服务启动时开始运行,因此计时器是在服务 构造函数中定义的。根据我的理解,订阅应该在计时器作用域中注册,如下所示(如果不需要,我不想更改它,除非它不正确)。

只要后端服务器没有出现错误\异常\错误 500 异常,则所有系统都工作正常。现在,我需要两件事:

  1. 每当后端服务器出现问题时,我想捕获Error 。
  2. 我希望观察者能够根据计时器时间(到下一个滴答声)继续运行,即使存在异常。每当出现异常时,我的最终结果应该是到达组件中的 popUpAlert 请参阅我的代码 - 这是 webapi 控制器:
public IActionResult getSomeErrorAsTest()
{
    try
    {
        throw new Exception("Serer error");
    }
    catch(Exception ex)
    {
        return StatusCode(StatusCodes.Status500InternalServerError, new List<string>());
        //throw ex;
    }
}
Run Code Online (Sandbox Code Playgroud)

这就是服务(假设每个 get 请求中的数据都会发生变化 - 如果确实如此,则无需实现):

export class MyService
{
    MyDataSubject = new Subject<any[]>();
    MyDataChanged :Observable>any[]> = this.MyDataSubject.asObservable();
    
    subscribe :Subscription;
    constructor(private httpClient : HttpClient)
    {
        this.subscribe = timer(0, 30000).pipe(
        switchMap(()=>
            this.getData())).subscribe();
    }
    getData()
    {
        return this.httpClient.get<any[]>(<controller …
Run Code Online (Sandbox Code Playgroud)

timer rxjs angular rxjs-observables rxjs-pipe

4
推荐指数
1
解决办法
2364
查看次数

如何取消 RXJS Effects 中发出的角度 http 请求

我想取消 Angular 8 中 RXJS 效果中发出的 http 请求。

@Effect() getReport$ = this.action$.pipe(
ofType(ActionTypes.GET_WIDGET),   
map(toPayload),
mergeMap(payload => {
  return this.dashboardService.getReport(payload).pipe(
    map(this.extractData),
    switchMap(result => {
      return observableOf(<ReceivedWidgetAction>{
        type: ActionTypes.RECEIVED_WIDGET,
        payload: result
      });
    }),
    catchError((err: Response | any) => {
      return this.handleReportError(err);
    }));
    
}));
Run Code Online (Sandbox Code Playgroud)

请让我知道如何使用 Angular 8 执行相同操作。另请注意,我将无法使用切换映射,因为多个 Angular 组件将使用不同的有效负载调用此操作。

typescript ngrx-effects ngrx-store angular8 rxjs-observables

3
推荐指数
1
解决办法
3552
查看次数


如何返回具有回调中值的可观察值?

我正在编写一项服务,我打算存储Place对象的本地副本,并仅当它们未存储在本地时才从后端获取它们。但是,我在实现此功能时遇到了困难。fetchPlace()如果来自的值place()未定义我可以设置我的页面来调用,但我打算保持fetchPlace()私有,以便稍后我可以实现一个系统来检查最近是否发出了请求,以便如果用户快速切换页面。

地点.service.ts

export class PlacesService {
  private _places = new BehaviorSubject<Place[]>([]);
  get places() {
    return this._places.asObservable();
  }

  constructor(private _http: HttpClient) {}

  place(placeId: number): Observable<Place> {
    return this._places.pipe(
      take(1),
      map((places: Place[]) => {
        console.log(places);
        let place = places.find((place: Place) => place.id === placeId);

        if (place === undefined) {
          console.log('Time to send a request!');
          this.fetchPlace(placeId).subscribe(
            (fetchedPlace: Place) => {
              console.log('We got one!');
              place = fetchedPlace;
              console.log(place);
            },
            (error) => {
              console.error('Looks like …
Run Code Online (Sandbox Code Playgroud)

rxjs angular rxjs-observables

3
推荐指数
1
解决办法
1247
查看次数

角度、可观察的管道限制?

我正在我的一名路线守卫身上做这件事......

canActivate(route: ActivatedRouteSnapshot, state: RouterStateSnapshot): Observable<boolean> {
    // go through each check sequentially, stop process if one does throwError
    return of(true).pipe( // Is there a limit to operators in a pipe???
      switchMap(() => of(true)), // simplified for this post...usually call a function that returns
      switchMap(() => of(true)),
      switchMap(() => of(true)),
      switchMap(() => of(true)),
      switchMap(() => of(true)),
      switchMap(() => of(true)),
      switchMap(() => of(true)),
      switchMap(() => of(true)),
      // if any function returns throwError, we skip other checks
      catchError(() => of(false))
    );
  } …
Run Code Online (Sandbox Code Playgroud)

visual-studio-code angular rxjs-observables

3
推荐指数
1
解决办法
914
查看次数

取消订阅仅在必要时发出一次的可观察计时器?

在我的程序中,我有一些timer(1000).subscribe()实例以及一些timer(1000, 1000).subscribe()部分。

我遇到了一些内存泄漏问题,想知道是否可以通过取消订阅计时器来缓解这些问题。取消订阅循环计时器似乎是直接且必要的,但我是否还必须取消订阅仅发出一次的计时器?

我的问题的第二部分是是否有更好的方法来取消订阅发射计时器而不是将其放入变量中,如下所示:

const myTimer = timer(1000).subscribe(() => {
    myTimer.unsubscribe();
});
Run Code Online (Sandbox Code Playgroud)

谢谢!

node.js observable rxjs typescript rxjs-observables

2
推荐指数
1
解决办法
732
查看次数