使用rxjs时,为什么switchMap不会触发完成事件?

Ric*_*ich 4 javascript reactive-programming rxjs angular

最近,在我的Angular应用程序中,我开始在几种不同的情况下使用rxjs switchMap运算符。我很快意识到,当使用switchMap时,当您订阅此流时,完成块不会触发(我也不认为错误块也会触发)。我在网上看到的所有示例似乎也都无法处理补全问题,我对此感到困惑,原因是什么?

我显然缺少关于switchMap或如何使用它的东西,但是我不知道是什么。

理想情况下,我希望使用触发Http请求的函数来调用函数,然后在错误块中处理错误,然后在完成块中处理请求后的内容。

这是我正在做的事的例子:

export class ResultsComponent {

  ngAfterViewInit() {

    Observable.combineLatest(...filters)
        .debounceTime(500)
        .distinctUntilChanged()
        .switchMap((activeFilters: Array<ActiveFilter>) => {
            const filters = this.mapFilters(activeFilters);
            return this.doSearch(this.term$.getValue(), filters);
        })
        .subscribe((res) => {
           this.onSearchSuccess(res);
        },
        (err) => {
            // THIS NEVER FIRES
            console.error(err);
            this.loading$.next(false);
        ,() => {
            // THIS NEVER FIRES
            this.loading$.next(false);
        });
  }

  private doSearch(input: string, filters: object): Observable<object> {
    return this.searchService.search(input, filters);
  }
}
Run Code Online (Sandbox Code Playgroud)

服务

export class SearchService {

  private baseUrl: string = 'http://mydomainhere.com/api';

  constructor(private http: Http) {}

  public search(input: string, filters: object): Observable<object> {
    const params = {
      "keyword": input,
      "filters": filters
    };
    const url = `${this.baseUrl}/search`;
    return this.http.post(url, params)
       .map(res => res.json())
       .catch(this.handleError);
  }
}
Run Code Online (Sandbox Code Playgroud)

Rob*_*her 10

用内部完成外部可观察

有多种方法可以使外部可观察对象与内部可观察对象完成。(下一节解释了为什么您可能不想这样做,然后是当外部可观察量未完成时检测内部可观察量完成的示例。)

如果你知道你的内部可观察量在完成之前只会发出一个值,就像 API 调用一样,你可以直接通过管道传输first到外部可观察量。

const { of , pipe } = rxjs;
const { switchMap, first } = rxjs.operators;

const stream = of(1, 2, 3).pipe(
    switchMap(() => of(4)),
    first()
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>
Run Code Online (Sandbox Code Playgroud)

但是,如果内部可观察值发出多个值,一个简单的更改就是使用endWithtakeWhile告诉外部可观察值何时完成。这假设我们知道内部 observable 永远不会发射null

const { of , pipe } = rxjs;
const { switchMap, endWith, takeWhile } = rxjs.operators;

const stream = of(1, 2, 3).pipe(
    switchMap(() => of(4, 5, 6).pipe(
      endWith(null)
    )),
    takeWhile((x) => x != null)
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>
Run Code Online (Sandbox Code Playgroud)

一个通用的解决方案是Subject当内部可观察对象完成时发出一次,并在主体发出时让外部可观察对象完成,并使用 来监视它takeUntil

const { of , pipe, Subject } = rxjs;
const { switchMap, tap, takeUntil } = rxjs.operators;

const innerComplete = new Subject();

const stream = of(1, 2, 3).pipe(
    switchMap(() => of(4, 5, 6).pipe(
      tap({
        complete: () => innerComplete.next()
      })
    )),
    takeUntil(innerComplete)
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>
Run Code Online (Sandbox Code Playgroud)

为什么没有完成?

当我第一次开始使用 RxJS 时,我主要是将现有的 API 调用转换为可观察量处理。实际上,这意味着当内部可观察量完成时,外部可观察量也将完成。但值得注意的是,外部可观察量并没有因为内部可观察量而完成。它完成是因为它只会发出一个值。如果它是一个可以发出多个值的可观察值(例如鼠标单击事件),则它不会与内部可观察值一起完成。

这是一件好事。它允许您拥有一个外部可观察量,通过内部可观察量映射其发射,而无需它在内部可观察量第一次完成时完成。例如,假设您想在每次鼠标单击时触发动画,并且该动画由计时器控制。鼠标点击将由外部可观察对象发出。内部可观察对象将运行计时器几秒钟来控制动画。动画完成后,您仍然希望捕获鼠标单击事件,以便动画可以再次启动。

以下代码片段将在每次单击时将一系列数字记录到控制台(我们的临时动画)。由于我们使用的是switchMap之前的“动画”,如果您单击它的中间,它就会停止(concatMap 部分只是在每次发射之间添加了延迟)。switchMap您可以在https://rxmarbles.com/#switchMap的弹珠图中直观地看到这一点

const { of , pipe, fromEvent, Subject } = rxjs;
const { switchMap, concatMap, delay } = rxjs.operators;

const innerComplete = new Subject();

const stream = fromEvent(document, 'click').pipe(
    switchMap(() => of(1, 2, 3).pipe(
      concatMap(x => of(x).pipe(delay(500)))
    ))
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>

<p>Click here and watch the console.</p>
Run Code Online (Sandbox Code Playgroud)

根据内在可观察的完成采取行动

鉴于当内部可观察量完成时外部可观察量不需要完成是有道理的,您可能希望有一种方法可以在内部可观察量完成时执行某些操作,而不必完成外部可观察量。tap当您传递观察者作为参数时,将允许您这样做。

const { of , pipe } = rxjs;
const { switchMap, tap } = rxjs.operators;

const stream = of (1, 2, 3).pipe(
    switchMap(() => of (4, 5, 6).pipe(tap({
      complete: () => console.log("Inner observable completed")
    }))))
    .subscribe({
      next: (x) => console.log(x),
      complete: () => console.log('Outer observable completed')
    });
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>
Run Code Online (Sandbox Code Playgroud)


byg*_*ace 5

对于switchMap,除非外部Observable已完成,否则内部Observable的完成不会触发流的完成。这是说明此的示例:

const first = Rx.Observable.interval(2000).take(2)
	.do(console.log.bind(null, 'first next'),
      console.log.bind(null, 'first error'),
      console.log.bind(null, 'first complete'));
const second = Rx.Observable.interval(200).take(2)
	.do(console.log.bind(null, 'second next'),
      console.log.bind(null, 'second error'),
      console.log.bind(null, 'second complete'));

first.switchMap(() => second)
	.subscribe(console.log.bind(null, 'stream next'),
      console.log.bind(null, 'stream error'),
      console.log.bind(null, 'stream complete'));
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
Run Code Online (Sandbox Code Playgroud)

内部可观察对象中引发的错误将调用外部可观察对象上的错误块。这是说明此的示例:

const source = Rx.Observable.interval(2000).take(4)
	.do(console.log.bind(null, 'source next'),
      console.log.bind(null, 'source error'),
      console.log.bind(null, 'source complete'));
const error = Rx.Observable.create((o) => {
  o.error();
}).do(console.log.bind(null, 'error next'),
      console.log.bind(null, 'error error'),
      console.log.bind(null, 'error complete'));

source.switchMap(() => error)
	.subscribe(console.log.bind(null, 'stream next'),
      console.log.bind(null, 'stream error'),
      console.log.bind(null, 'stream complete'));
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
Run Code Online (Sandbox Code Playgroud)

因此,您可以根据自己的喜好来捕获外部的可观察物,并获得错误。

如果要观察内部可观察对象的完成,则必须在switchMap内部观察它。

至于为什么您看不到在线使用完成模块的原因,我不能代表所有人,但就我个人而言,我发现自己在应用程序中并不需要太多。我只是在乎来自的数据next