ger*_*lus 18 javascript reactive-extensions-js rxjs
如何在浏览器中使用RxJs执行以下方案:
我提出的中间解决方案:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable
.interval(1000)
.delay(5000)
.map(_ => jobQueueData.jobId)
.take(55)
)
.flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
.filter(result => result.completed)
.subscribe(
result => console.log('Result', result),
error => console.log('Error', error)
);
Run Code Online (Sandbox Code Playgroud)
takeUntilflatMap使用此语义正确的?也许整个事情应该被改写而不是被束缚flatMap?Mat*_*ell 32
从顶部开始,你有一个承诺,你变成了一个可观察的.一旦产生一个值,您希望每秒拨一次电话,直到您收到某个响应(成功)或直到经过一定的时间.我们可以将此解释的每个部分映射到Rx方法:
"一旦产生一个值"= map/ flatMap(flatMap在这种情况下,因为接下来的内容也将是可观察的,我们需要将它们展平)
"每秒一次"= interval
"得到一定的回应"= filter
"或"= amb
"已经过了一定的时间"= timer
从那里,我们可以像这样拼凑它们:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.amb(
Rx.Observable.timer(60000)
.flatMap(() => Rx.Observable.throw(new Error('Timeout')))
)
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
Run Code Online (Sandbox Code Playgroud)
一旦我们获得了初始结果,我们就会将其投射到两个可观察对象之间的竞赛中,一个在收到成功响应时会产生一个值,另一个在经过一定时间后会产生一个值.第二个flatMap是因为.throw在可观察的实例上不存在,并且方法on Rx.Observable返回一个也需要被展平的observable.
事实证明,amb/ timercombo实际上可以替换为timeout,如下所示:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
Run Code Online (Sandbox Code Playgroud)
我省略了.delay你的样本,因为它没有在你想要的逻辑中描述,但它可以适合这个解决方案.
所以,直接回答你的问题:
interval将在订户计数降至零的时刻处理,这将在take(1)或amb/ timeout完成时发生.这里是jsbin我一起测试解决方案(您可以调整返回的值pollQueueForResult以获得所需的成功/超时;为了快速测试,时间被除以10).
来自@ matt-burnell的优秀答案的小优化.您可以使用第一个运算符替换过滤器并使用运算符,如下所示
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.first(x => x.completed)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
);
Run Code Online (Sandbox Code Playgroud)
另外,人们可能不知道,flatMap运营商是一个别名mergeMap在RxJS 5.0.
| 归档时间: |
|
| 查看次数: |
8521 次 |
| 最近记录: |