switchMap 通过 RxJs 中的属性来区分

cez*_*ezn 4 javascript rxjs ngrx redux-observable switchmap

比方说,我有一系列行动。每个动作都被分配了一些id。像这样:

const actions$ = of({ id: 1 }, { id: 2 }, { id: 1 });
Run Code Online (Sandbox Code Playgroud)

现在,对于每个操作,我想在 switchMap 中执行一些逻辑:

actions$.pipe(switchMap(a => /* some cancellable logic */)).subscribe(...);
Run Code Online (Sandbox Code Playgroud)

问题在于每个发出的操作都会取消之前的“某些可取消的逻辑”。

是否可以根据操作id (最好是运算符)取消“某些可取消逻辑” ?就像是:

actions$.pipe(switchMapBy('id', a => /*some cancellable logic */)).subscribe(...)
Run Code Online (Sandbox Code Playgroud)

本质上,switchMap的当前行为
1. actions$发出 id #1。switchMap订阅嵌套的 observable。
2. actions$发出 id #2。switchMap取消订阅先前的嵌套可观察值。订阅新的。
3. actions$发出 id #1。switchMap再次取消订阅先前的嵌套可观察值。订阅新的。

预期行为
1. actions$发出 id #1。switchMap订阅嵌套的 observable。
2. actions$发出 id #2。switchMap再次订阅嵌套的 observable(这次是#2)。区别在于,它不会取消 #1 中的那个
3. actions$发出 id #1。switchMap取消订阅 #1 的嵌套可观察值。再次订阅,#1。

bry*_*n60 8

这似乎是 mergeMap 运算符的一个用例。switchMap 的用例是仅维护一个内部订阅并取消以前的订阅,这不是您在这里所追求的。您需要多个内部订阅,并且希望它们在同一 id 的新值出现时取消,因此需要实现一些自定义逻辑来做到这一点。

大致如下:

action$.pipe(
  mergeMap(val => {
    return (/* your transform logic here */)
              .pipe(takeUntil(action$.pipe(filter(a => a.id === val.id)))); // cancel it when the same id comes back through, put this operator at the correct point in the chain
  })
)
Run Code Online (Sandbox Code Playgroud)

您可以通过编写自定义运算符将其变成可重复使用的东西:

import { OperatorFunction, Observable, from } from 'rxjs';
import { takeUntil, filter, mergeMap } from 'rxjs/operators';

export function switchMapBy<T, R>(
  key: keyof T,
  mapFn: (val: T) => Observable<R> | Promise<R>
): OperatorFunction<T, R> {
  return input$ => input$.pipe(
    mergeMap(val => 
      from(mapFn(val)).pipe(
        takeUntil(input$.pipe(filter(i => i[key] === val[key])))
      )
    )
  );
}
Run Code Online (Sandbox Code Playgroud)

并像这样使用它:

action$.pipe(
  switchMapBy('id', (val) => /* your transform logic here */)
);
Run Code Online (Sandbox Code Playgroud)

这是它的闪电战行动:https://stackblitz.com/edit/rxjs-x1g4vc?file =index.ts