如何阻止去抖动的 Rxjs Observable?

Ite*_*tor 8 javascript rxjs angular

我创建了一个 observable,它将在最后一次更改后 3 秒触发,并调用publishChange服务的 。它有效,但我想创建一个doImmediateChange函数,该函数publishChange立即调用并停止去抖动的 observable。这怎么可能?

我的组件:

class MyComponent {
    private updateSubject = new Subject<string>();

    ngOnInit() {
        this.updateSubject.pipe(
            debounceTime(3000),
            distinctUntilChanged()
        ).subscribe(val => {
            this.srv.publishChange(val);
        });
    }

    doChange(val: string) {
        this.updateSubject.next(val);
    }

    doImmediateChange(val: string) {

        // Stop the current updateSubject if debounce is in progress and call publish immediately
        // ??
        this.srv.publishChange(val);

    }

}
Run Code Online (Sandbox Code Playgroud)

fri*_*doo 9

您可以模拟debounceTime使用switchMapdelay。然后取消内部 ObservabletakeUntil以防止发出等待值。

private updateSubject = new Subject<string>();
private interrupt = new Subject();

ngOnInit() {
  this.updateSubject.pipe(
    switchMap(val => of(val).pipe(
      delay(3000),
      takeUntil(this.interrupt)
    ))
  ).subscribe(val => publish(val));
}

doChange(val: string) {
  this.updateSubject.next(val);
}

doImmediateChange(val: string) {
  this.interrupt.next();
  publish(val);
}
Run Code Online (Sandbox Code Playgroud)

https://stackblitz.com/edit/rxjs-ya93fb


Ric*_*unn 6

使用竞赛运算符

第一个完成的可观察量成为唯一订阅的可观察量,因此此递归函数将在一次发射后完成take(1),然后重新订阅() => this.raceRecursive()

private timed$ = new Subject<string>();
private event$ = new Subject<string>();

ngOnInit() {
  this.raceRecursive()
}

raceRecursive() {
  race(
    this.timed$.pipe(debounceTime(1000)),
    this.event$
  )
    .pipe(take(1)) // force it to complete
    .subscribe(
      val => console.log(val), // srv call here
      err => console.error(err),
      () => this.raceRecursive() // reset it once complete
    )
}

doChange(val: string) {
  this.timed$.next(val)
}

doImmediateChange(val: string) {
  this.event$.next(val)
}
Run Code Online (Sandbox Code Playgroud)