类似于exhaustMap的运算符,但它会记住源中最后跳过的值并最终执行它

And*_* II 6 rxjs reactivex

我需要一个类似于 的运算符exahustMap,但它会记住最后一个跳过的可观察值,并在当前可观察值完成后执行它。

例如,考虑以下的大理石图exhaustMap

ExhaustMap大理石图

在我的例子中,蓝色值发出后,后面会跟着三个值 50。当然,在这种情况下它看起来就像concatMap,但如果 3 和 5 之间还有一个 4,它就不会反映在输出。

exhaustMap我已经设法编写了自己的运算符,其实现方式类似于:

function exhaustLatestMap<T, R>(project: (value: T) => Subscribable<R>): OperatorFunction<T, R> {
    return source => new Observable<R>(observer => 
        source.subscribe(new ExhaustLatestMapOperatorSubscriber(observer, project)));
}

class ExhaustLatestMapOperatorSubscriber<T, R> implements Observer<T> {

    constructor(
        private observer: Subscriber<R>,
        private project: (value: T) => Subscribable<R>) { }

    innerSub: AnonymousSubscription = null;
    latestValue: T;

    next(value: T) {
        this.processNext(value);
    }

    error(err) {
        this.observer.error(err);
    }

    complete() {
        this.observer.complete();
    }

    private processNext(value: T) {
        this.latestValue = value;
        if (!this.innerSub) {
            this.innerSub = this.project(value).subscribe({
                next: v => this.observer.next(v),
                error: err => {
                    this.observer.error(err);
                    this.endInnerSub(value)
                },
                complete: () => {
                    this.endInnerSub(value);
                }
            });
        }
    }

    private endInnerSub(value: T) {
        this.innerSub.unsubscribe();
        this.innerSub = null;
        if (this.latestValue !== value) {
            this.processNext(this.latestValue);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

但我想知道是否有一种方法可以通过重用和组合现有的运算符来实现它。有任何想法吗?

car*_*ant 3

可以仅使用内置工厂和操作符来实现它。然而,AFAICT,如果不管理某些每个订阅的状态就无法完成。

幸运的是,defer工厂函数使得管理每个订阅的状态相对简单和安全。而且,除了帮助管理每个订阅状态之外,defer还可以用作订阅可观察对象时收到通知的机制。

另一种实现方式:

const {
  concat,
  defer,
  EMPTY,
  merge,
  of
} = rxjs;

const {
  delay,
  mergeMap,
  tap
} = rxjs.operators;

const exhaustMapLatest = project => source => defer(() => {
  let latestValue;
  let hasLatestValue = false;
  let isExhausting = false;
  const next = value => defer(() => {
    if (isExhausting) {
      latestValue = value;
      hasLatestValue = true;
      return EMPTY;
    }
    hasLatestValue = false;
    isExhausting = true;
    return project(value).pipe(
      tap({ complete: () => isExhausting = false }),
      s => concat(s, defer(() => hasLatestValue ?
        next(latestValue) :
        EMPTY
      ))
    );
  });
  return source.pipe(mergeMap(next));
});

const source = merge(
  of(0).pipe(delay(0)),
  of(1000).pipe(delay(1000)),
  of(1100).pipe(delay(1100)),
  of(1200).pipe(delay(1200)),
  of(2000).pipe(delay(2000))
);

source.pipe(
  exhaustMapLatest(value => merge(
    of(`${value}:0`).pipe(delay(0)),
    of(`${value}:150`).pipe(delay(150)),
    of(`${value}:300`).pipe(delay(300))
  ))
).subscribe(value => console.log(value));
Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

此实现与您的实现之间存在一些行为差异:

  • 此实现使用hasLatestValue标志而不是相等检查,因此如果最新值等于初始值,则仍会进行投影。
  • 通过此实现,如果结果可观察值的订阅者取消订阅,则对预计可观察值的任何订阅也将被取消订阅。通过您的实施,内部订阅将保持订阅状态 - AFAICT - 直到预计的可观察量完成或出现错误。

我并不是主张应该以这种方式实施。答案只是为了展示一种替代实现。