RxJs:轮询直到完成间隔或接收到正确的数据

ger*_*lus 18 javascript reactive-extensions-js rxjs

如何在浏览器中使用RxJs执行以下方案:

  • 将数据提交到队列进行处理
  • 找回工作ID
  • 每1秒轮询另一个端点,直到结果可用或已经过60秒(然后失败)

我提出的中间解决方案:

 Rx.Observable
    .fromPromise(submitJobToQueue(jobData))
    .flatMap(jobQueueData => 
      Rx.Observable
            .interval(1000)
            .delay(5000)
            .map(_ => jobQueueData.jobId)
            .take(55)
    )
    .flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
    .filter(result => result.completed)
    .subscribe(
      result => console.log('Result', result),
      error =>  console.log('Error', error)
    );
Run Code Online (Sandbox Code Playgroud)
  1. 有没有中间变量的方法在数据到达或发生错误时停止计时器?我现在可以介绍新的observable然后使用takeUntil
  2. flatMap使用此语义正确的?也许整个事情应该被改写而不是被束缚flatMap

Mat*_*ell 32

从顶部开始,你有一个承诺,你变成了一个可观察的.一旦产生一个值,您希望每秒拨一次电话,直到您收到某个响应(成功)或直到经过一定的时间.我们可以将此解释的每个部分映射到Rx方法:

"一旦产生一个值"= map/ flatMap(flatMap在这种情况下,因为接下来的内容也将是可观察的,我们需要将它们展平)

"每秒一次"= interval

"得到一定的回应"= filter

"或"= amb

"已经过了一定的时间"= timer

从那里,我们可以像这样拼凑它们:

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .amb(
        Rx.Observable.timer(60000)
          .flatMap(() => Rx.Observable.throw(new Error('Timeout')))
      )
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;
Run Code Online (Sandbox Code Playgroud)

一旦我们获得了初始结果,我们就会将其投射到两个可观察对象之间的竞赛中,一个在收到成功响应时会产生一个值,另一个在经过一定时间后会产生一个值.第二个flatMap是因为.throw在可观察的实例上不存在,并且方法on Rx.Observable返回一个也需要被展平的observable.

事实证明,amb/ timercombo实际上可以替换为timeout,如下所示:

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .timeout(60000, Rx.Observable.throw(new Error('Timeout')))
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;
Run Code Online (Sandbox Code Playgroud)

我省略了.delay你的样本,因为它没有在你想要的逻辑中描述,但它可以适合这个解决方案.

所以,直接回答你的问题:

  1. 在上面的代码中,不需要手动停止任何操作,因为interval将在订户计数降至零的时刻处理,这将在take(1)amb/ timeout完成时发生.
  2. 是的,原始中的两种用法都是有效的,因为在两种情况下,您都将一个可观察量的每个元素投影到一个新的可观察量中,并且希望将所得到的可观察量的可观察量变成一个常规的可观察量.

这里是jsbin我一起测试解决方案(您可以调整返回的值pollQueueForResult以获得所需的成功/超时;为了快速测试,时间被除以10).


Joe*_*ing 9

来自@ matt-burnell的优秀答案的小优化.您可以使用第一个运算符替换过滤器使用运算符,如下所示

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .first(x => x.completed)
      .map(() => 'Completed')
      .timeout(60000, Rx.Observable.throw(new Error('Timeout')))

  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  );
Run Code Online (Sandbox Code Playgroud)

另外,人们可能不知道,flatMap运营商是一个别名mergeMap在RxJS 5.0.