RxJS 运算符在事件流中等待安静期,但在事件流繁忙的情况下不会永远等待

cod*_*ape 5 javascript rxjs

场景:

  • 我有一个事件流,每个事件都应该导致信息的更新显示(事件流来自 websockets,显示在 highcharts 图表中,但这并不重要)
  • 出于性能原因,我不想为每个事件触发 UI 更新。
  • 我宁愿做以下事情:

    • 当我收到事件时,我只想更新 UI,距离上次更新已超过 X 毫秒
    • 但是,如果有任何传入事件,我无论如何都想每隔 Y 毫秒 (Y > X) 进行一次更新
    • 因此,我正在寻找某种(组合)RxJS 运算符,该运算符将限制事件流的速率,仅在出现安静期(或超过等待安静期最大时间)时才发出事件。
    • 也就是说,我想等待安静的时期,但不是永远。

我怎样才能实现上面描述的内容?

我看过:

car*_*ant 3

debounce您可以通过在通知程序可观察值的组合中使用和采用两个计时器来编写一个运算符来执行您想要的操作:

  • 一个计时器,在源发出值后发出 X 毫秒;和
  • 一个计时器,在操作符返回的可观察值发出值后 Y 毫秒发出。

请参阅下面的片段。里面的评论应该解释它是如何工作的。

const {
  ConnectableObservable,
  merge,
  MonoTypeOperatorFunction,
  Observable,
  of,
  Subject,
  Subscription,
  timer
} = rxjs;

const {
  concatMap,
  debounce,
  mapTo,
  publish,
  startWith,
  switchMap
} = rxjs.operators;

// The pipeable operator:

function waitUntilQuietButNotTooLong(
  quietDuration,
  tooLongDuration
) {

  return source => new Observable(observer => {

    let tooLongTimer;
    
    // Debounce the source using a notifier that emits after `quietDuration`
    // milliseconds since the last source emission or `tooLongDuration`
    // milliseconds since the observable returned by the operator last
    // emitted.

    const debounced = source.pipe(
      debounce(() => merge(
        timer(quietDuration),
        tooLongTimer
      ))
    );

    // Each time the source emits, `debounce` will subscribe to the notifier.
    // Use `publish` to create a `ConnectableObservable` so that the too-long
    // timer will continue independently of the subscription from `debounce`
    // implementation.

    tooLongTimer = debounced.pipe(
      startWith(undefined),
      switchMap(() => timer(tooLongDuration)),
      publish()
    );

    // Connect the `tooLongTimer` observable and subscribe the observer to
    // the `debounced` observable. Compose a subscription so that
    // unsubscribing from the observable returned by the operator will
    // disconnect from `tooLongTimer` and unsubscribe from `debounced`.

    const subscription = new Subscription();
    subscription.add(tooLongTimer.connect());
    subscription.add(debounced.subscribe(observer));
    return subscription;
  });
}

// For a harness, create a subject and apply the operator:

const since = Date.now();
const source = new Subject();
source.pipe(
  waitUntilQuietButNotTooLong(100, 500)
).subscribe(value => console.log(`received ${value} @ ${Date.now() - since} ms`));

// And create an observable that emits at a particular time and subscribe
// the subject to it:

const emissions = of(0, 50, 100, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 800, 850, 900, 950, 1000, 1050, 1100, 1150);
emissions.pipe(
  concatMap((value, index) => timer(new Date(since + value)).pipe(
    mapTo(index)
  ))
).subscribe(source);
Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
Run Code Online (Sandbox Code Playgroud)