创建一个延迟下一个值的Observable

que*_*dus 5 reactive-extensions-js observable rxjs

我正在尝试使用RxJS创建一个可观察到的观察图像.

预期的可观察映射

  • 获取一个值并在获得下一个值之前等待一段固定的时间.
  • 下一个将是等待期间发出的最后一个值,跳过其余的值.
  • 如果等待时间间隔没有发出任何值,则应立即抓取下一个值,如图像的最后一个示例所示.

cwh*_*ris 5

这应该可以解决问题.

var Rx      = require('rx'),
    source  = Rx.Observable.interval(10).take(100),
    log     = console.log.bind(console);

Rx.Observable.create(function (observer) {

    var delaying = false,
        hasValue = false,
        complete = false,
        value;

    function onNext (x) {
      value = x;
      if (delaying) {
        hasValue = true;
      } else {
        sendValue();
      }
    }

    function sendValue () {
      observer.onNext(value);
      if (complete) {
        observer.onCompleted();
      } else {
        setTimeout(callback, 1000); // exercise for the reader. Use a scheduler.
      }
      delaying = true;
    }

    function callback () {
      if (hasValue) {
        hasValue = false;
        sendValue();
      } else {
        delaying = false;
      }
    }

    return source.subscribe(
        onNext,
        observer.onError.bind(observer),
        function () {
          if (hasValue) {
            complete = true;
          } else {
            observer.onCompleted();
          }
        }
      );
  })
  .subscribe(log);
Run Code Online (Sandbox Code Playgroud)