RxJS Operator 延迟处理后续事件

ita*_*fna 7 javascript observable rxjs typescript angular

我正在寻找一个 RxJS 运算符(或运算符组合),它可以让我实现这一目标:

  1. 由可观察对象触发的每个事件都应按其触发顺序进行处理(不得跳过、限制、消除任何值)。
  2. 我应该能够定义处理两个后续事件之间的设定间隔。
  3. 如果当前没有处理先前的事件,则应立即处理下一个事件(无延迟)。

我创建了以下示例,希望能让这一点更清楚:

import { Component, OnInit } from "@angular/core";
import { Subject } from "rxjs";
import { delay } from "rxjs/operators";

@Component({
  selector: "app-root",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"]
})
export class AppComponent implements OnInit {
  counter = 1;
  displayedValue = 1;

  valueEmitter = new Subject<number>();
  valueEmitted$ = this.valueEmitter.asObservable();

  ngOnInit() {
    // I want to allow each incremented value to be displayed for at least 2 seconds
    this.valueEmitted$.pipe(delay(2000)).subscribe((value) => {
      this.displayedValue = value;
    });
  }

  onClick() {
    const nextCounter = ++this.counter;
    this.counter = nextCounter;
    this.valueEmitter.next(nextCounter);
  }
}
Run Code Online (Sandbox Code Playgroud)

在此示例中,我希望 中递增的每个值都counter显示为displayedValue至少 2 秒。

因此,用户第一次单击时,onClick显示的值应立即从 1 更改为 2,但如果用户继续快速递增,则应counterdisplyedValue延迟的方式跟随,允许每个先前递增的数字在更改之前显示 2 秒,慢慢追上当前计数器值。

这可以通过 RxJs 运算符实现吗?

更新

目前我想出了以下解决方案

import { Component, OnInit } from "@angular/core";
import { Subject } from "rxjs";

@Component({
  selector: "app-root",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"]
})
export class AppComponent implements OnInit {
  counter = 1;
  displayedValue = 1;
  currentDisplayedValueStartTime = 0;
  readonly DISPLAY_DURATION = 2000;
  valuesToDisplay = [];
  displayedValueSwitcherInterval: number | null = null;

  valueEmitter = new Subject<number>();
  valueEmitted$ = this.valueEmitter.asObservable();

  ngOnInit() {
    this.valueEmitted$.pipe().subscribe((value) => {
      this.handleValueDisplay(value);
    });
  }

  onClick() {
    const nextCounter = ++this.counter;
    this.counter = nextCounter;
    this.valueEmitter.next(nextCounter);
  }

  handleValueDisplay(value) {
    this.valuesToDisplay.push(value);

    if (!this.displayedValueSwitcherInterval) {
      this.displayedValue = this.valuesToDisplay.shift();

      this.displayedValueSwitcherInterval = window.setInterval(() => {
        const nextDiplayedValue = this.valuesToDisplay.shift();
        if (nextDiplayedValue) {
          this.displayedValue = nextDiplayedValue;
        } else {
          clearInterval(this.displayedValueSwitcherInterval);
          this.displayedValueSwitcherInterval = null;
        }
      }, this.DISPLAY_DURATION);
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

这不是我想要的,因为它依赖于组件状态并setInterval管理通知队列。我希望找到一些更干净、更具声明性的东西,它依赖于 RxJs 运算符来处理我的通知流及其间隔队列 - 但至少我让该功能完全按照我想要的方式工作。仍然愿意接受有关如何改进这一点的建议。

And*_*tej 6

我认为这可能是解决这个问题的一种方法:

this.valueEmitted$
  .pipe(
    concatMap(
      (v) => concat(
        of(v),
        timer(2000).pipe(ignoreElements())
      )
    )
  )
  .subscribe((value) => {
    this.displayedValue = value;
  });
Run Code Online (Sandbox Code Playgroud)

因为timer还会发出一些值(0、1、2 等),所以我们使用ignoreElements这些值是因为我们对这些值不感兴趣,我们想要的只是timer完整的通知,该通知将指示可以订阅下一个内部可观察值的时刻。请注意,前面提到的内部可观察量是通过以 为参数调用 的concatMap回调函数来创建的v

工作演示。


由可观察对象触发的每个事件都应按其触发顺序进行处理(不得跳过、限制、消除任何值)。

这是在 的帮助下确保的concatMap

我应该能够定义处理两个后续事件之间的设定间隔。

我们可以通过以下方式实现这一目标:

concat(
  // First, we send the value.
  of(v),
  // Then we set up a timer so that in case a new notification is emitted, it will have to
  // wait for the timer to end.
  timer(2000).pipe(ignoreElements())
)
Run Code Online (Sandbox Code Playgroud)

如果当前没有处理先前的事件,则应立即处理下一个事件(无延迟)。

如果 没有创建活动的内部可观察量concatMap,则该值v将立即发送给消费者。


这也可以工作:

this.valueEmitted$
  .pipe(
    concatMap(
      (v) => timer(2000).pipe(
        ignoreElements(), startWith(v)
      )
    )
  )
  .subscribe((value) => {
    this.displayedValue = value;
  });
Run Code Online (Sandbox Code Playgroud)