RxJS 6-取消/结束管道

Roo*_*kie 7 tap pipe rxjs angular

使用新版本的RxJS 6,尤其是管道运算符。当前,使用管道来获取API调用的结果,并将其传递给一系列其他任务。

一切正常,但在遇到问题时似乎无法找到取消或终止管道的方法。例如,我正在使用tap运算符检查该值是否为null。然后,我抛出一个错误,但是管道仍然似乎移至下一个任务,在本例中为concatmap。

因此,如何过早结束或取消管道?提前致谢。

getData(id: String): Observable<any[]> {
return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
   tap(evt => {
    if (evt == null) {
      return throwError(new Error("No data found..."));
    }
  }),
concatMap(
  evt =>
     <Observable<any[]>>(
        this.http.get<any[]>(
    `${this.baseUrl}/path/relatedby/${evt.child_id}`
      ).map(res =>( {"response1":evt, "response2":res}) )
 )
),
retry(3),
catchError(this.handleError("getData", []))
);}
Run Code Online (Sandbox Code Playgroud)

Ben*_*ing 8

您还可以使用信号Subject和 rxjs 运算符取消/结束管道:takeUntil

例子

httpGetSafe(path: string): Observable<Data> {
  const stopSignal$ = new Subject();

  return this.http.get<Data>(path).pipe(
    map(data => {
      const isBad = data === null;
      if (isBad) {
        stopSignal$.next();
      }
      return data;
    }),
    takeUntil(stopSignal$)
  );
}
Run Code Online (Sandbox Code Playgroud)

有时它比抛出错误更简单,更灵活......


exc*_*ion 7

如果您只想结束管道的执行而不抛出错误,则可以返回 EMPTY。

myObservable.pipe(
  switchMap(c => c === null ? EMPTY: of(c)),
  tap(() => console.log('If c is null this is not executed anymore'))
);
Run Code Online (Sandbox Code Playgroud)

takeWhile 运算符也是一个选项。

myObservable.pipe(
  takeWhile(c => c !== null),
  tap(() => console.log('If c is null this is not executed anymore'))
);
Run Code Online (Sandbox Code Playgroud)

两种方法都可以直接完成,无需走下坡路!


Deb*_*ahK 6

我尝试了这种Stackblitz所具有的基本概念,并成功了。它取消了剩余的操作。请参阅下面的链接。

https://stackblitz.com/edit/angular-4ctwsd?file=src%2Fapp%2Fapp.component.ts

我在您的代码和我的代码之间看到的区别是,我使用了throw而不是throwError(您写的是什么?),而我只是抛出错误……不返回抛出的错误。

这是供参考的代码:

import { Component } from '@angular/core';
import { of, from } from 'rxjs';
import { map, catchError, tap, retry} from 'rxjs/operators';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent {
  name = 'Angular 6';

  constructor() {
    of('a', 'b', 'c', 'd')
      .pipe(
       map(x => {
        if (x === 'c') {
          throw 'An error has occurred';
        }
        return x;
       }),
       tap(x => console.log('In tap: ', x)),
       retry(3),
       catchError(() => of('caught error!'))
      )
      .subscribe(x => console.log(x));
  }
}
Run Code Online (Sandbox Code Playgroud)


Tie*_*udt 6

RXJS 可以抛出错误并停止执行。您在管道中应用的运算符只是错误的选择。

您正在使用的“tap”运算符仅用于应用副作用,即更改 DOM、console.log,或更改组件上某些变量的值(即这个.counter = someValue)。tap 操作符并不打算更改 RXJS 'stream' - 它只会返回与接收到的相同的 observable。https://rxjs-dev.firebaseapp.com/api/operators/tap

另一方面,在流上工作的运算符,如“map”,可能会引发错误。看到这个堆栈闪电战

总结一下,代码是

getData(): Observable<any[]> {
    return this.httpGet('basePath').pipe(
      map(evt => {
        if (evt === null) {
          return throwError(new Error("No data found..."));
        } else {
          return evt;
        }
      }),
      concatMap(evt => {
        return this.httpGet(`childPath/${evt.serverResult}`); 
      }),
      map(res => {
        return {
          "response2": res.serverResult
        };
      }),
      retry(3),
      catchError(error => {
        console.log('error handled!');
        return of(null);
      })
    )
  }

  private httpGet(someString: string): Observable<{ serverResult: number }> {
    return timer(1000).pipe( // fake a server call that takes 1 second
      map(_ => { // fake a server result
      // Comment out the valid return and uncomment the null return to see it in action
        //return null;
        return { 
          serverResult: 1 
        }
      }) 
    );
  }
Run Code Online (Sandbox Code Playgroud)

如果您不想将错误转换为流中的有效值,请在 subscribe 函数的错误回调中处理它。