rxjs使用可变响应时间定期轮询端点

act*_*cay 6 observable rxjs

我希望轮询端点的速度不会超过每秒一次,并且不会慢于轮询端点所需的时间.永远不应该有多个未完成的请求.

我想要一种反应式编程方式每秒至少轮询一次端点,但如果端点花费的时间超过1秒,则下一个请求会立即触发.

在下面的大理石图中,第2次和第3次请求的时间超过1秒,但第4次和第5次请求更快完成.下一个请求在1秒边界上触发,或者在从最后一个未完成请求获取数据后立即触发.

s---s---s---s---s---s---| # 1 second interval observable
r---r----r--------r-r---| # endpoint begin polling events
-d-------d--------dd-d--| # endpoint data response events
Run Code Online (Sandbox Code Playgroud)

我试图在大理石图中使术语正确,所以我假设端点请求的开头应该是大理石我标记为"r",而标记为"d"的大理石事件具有端点数据.

这是我用普通的js做了多少代码.但是,根据我上面的要求,随后的请求不会立即触发.

    var poll;
    var previousData;
    var isPolling = false;
    var dashboardUrl = 'gui/metrics/dashboard';
    var intervalMs = updateServiceConfig.getIntervalInMilliSecondForCharts();

    return {
        startInterval: startInterval,
        stopInterval: stopInterval
    };

    function startInterval() {
        stopInterval();
        tryPolling(); // immediately hit the dashboard
        // attempt polling at the interval
        poll = $interval(tryPolling, intervalMs);
    }

    /**
     * attempt polling as long as there is no in-flight request
     * once the in-flight request completes or fails, allow the next request to be processed
     */
    function tryPolling() {
        if (!isPolling) {
            isPolling = true;

            getDashboard()
            // if the dashboard either returns successful or fails, reset the polling boolean
                .then(resetPolling, resetPolling);
        }
    }

    /** there's no longer an in-flight request, so reset the polling boolean */
    function resetPolling() {
        isPolling = false;
    }

    function stopInterval() {
        if (poll) {
            $interval.cancel(poll);
            poll = undefined;
        }
    }

    function getDashboard() {
        return restfulService.get(dashboardUrl)
            .then(updateDashboard);
    }

    function updateDashboard(data) {
        if (!utils.deepEqual(data, previousData)) {
            previousData = angular.copy(data);
            $rootScope.$broadcast('$dashboardLoaded', data);
        }
    }
Run Code Online (Sandbox Code Playgroud)

car*_*ant 5

这是我的解决方案。它使用内部主题,combineLatestfilter确保如果响应比timer周期慢,则请求不会累积。

评论应说明其工作原理。

const delays = [100, 2000, 100, 3000];
const since = Date.now();
let index = 0;

function mock() {
    return Rx.Observable
    .of("res")
    .do(() => console.log("mock req at ", Date.now() - since, " ms"))
    .delay(delays[index++ % delays.length])
    .do(() => console.log("mock res at ", Date.now() - since, " ms"));
}

function poll() {

  return Rx.Observable.defer(() => {

    // Use defer so that the internal subject is created for each
    // subscription.
    const subject = new Rx.BehaviorSubject({ tick: -1, pending: false });

    return Rx.Observable
    
      // Combine the timer and the subject's state.
      .combineLatest(
        Rx.Observable.timer(0, 1000).do(tick => console.log("tick", tick)),
        subject
      )

      // Filter out combinations in which either a more recent tick
      // has not occurred or a request is pending.
      .filter(([tick, state]) => (tick !== state.tick) && !state.pending)

      // Update the subject's state.
      .do(([tick]) => subject.next({ tick, pending: true }))
      
      // Make the request and use the result selector to combine
      // the tick and the response.
      .mergeMap(([tick]) => mock(), ([tick], resp) => [tick, resp])

      // Update the subject's state.
      .do(([tick]) => subject.next({ tick, pending: false }))
      
      // Map the response.
      .map(([tick, resp]) => resp);
  });
}

poll().take(delays.length).subscribe(r => console.log(r));
Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)


我只是想到有一个运算符可以做到这一点:exhaustMap

const delays = [100, 2000, 100, 3000];
const since = Date.now();
let index = 0;

function mock() {
  return Rx.Observable
    .of("res")
    .do(() => console.log("mock req at ", Date.now() - since, " ms"))
    .delay(delays[index++ % delays.length])
    .do(() => console.log("mock res at ", Date.now() - since, " ms"));
}

const poll = Rx.Observable
  .timer(0, 1000)
  .do(tick => console.log("tick", tick))
  .exhaustMap(() => mock());

poll.take(delays.length).subscribe(r => console.log(r));
Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

  • 将答案更新为使用`exhaustMap`-使用该解决方案很简单。 (2认同)