Hug*_*ira 8 javascript observable rxjs reactivex rxjs5
想象一下,我有以下代码:
let a = Rx.Observable.of(1, 2, 3)
let b = Observable.zip(a, a, (a, b) => a + b)
b.forEach(t => console.log(t))
Run Code Online (Sandbox Code Playgroud)
这会立即输出结果.现在,我如何在每个消息之间设置一个定时延迟作为一种反压方式(注意我不想要一个缓冲区;相反,我想要a并b成为Cold Observables),如:
b.takeEvery(1000).forEach(t => console.log(t))
Run Code Online (Sandbox Code Playgroud)
并得到完全相同的答案:
<wait 1s>
2
<wait 1s>
4
<wait 1s>
6
Run Code Online (Sandbox Code Playgroud)
替代方案:如果RxJS不支持背压(某些可观测量的pull拉机制),那么如何在不耗尽资源的情况下创建无限生成器呢?
备选方案2:支持拉动和推动机制的其他JS框架?
如果是 RxJS 5.x 则不支持背压,但4.x 版本中有例如pausable运算符。它仅适用于热可观察量。有关4.x情况下的背压的更多信息,请参见此处(特别是在底部和 RxJS 相关描述中获取战利品)。
这条 Erik Meijer 的推文可能有点争议但相关:https://twitter.com/headinthebox/status/774635475071934464
对于您自己的背压机制的实现,您需要有 2 路通信通道,可以使用 2 个主题轻松创建该通道 - 每端一个。主要用于next发送消息和.subscribe向另一端列出。
创建一个生成器也是可行的 - 再次使用主题在推式和拉式世界之间建立桥梁。下面是生成斐波那契数的示例性实现。
const fib = () => {
const n = new Rx.Subject()
const f = n
.scan(c => ({ a: c.b, b: c.b + c.a }), { a: 0, b: 1 })
.map(c => c.a)
return {
$: f,
next: () => n.next()
}
}
const f = fib()
f.$.subscribe(n => document.querySelector('#r').innerHTML = n)
Rx.Observable.fromEvent(document.querySelector('#f'), 'click')
.do(f.next)
.subscribe()Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
<button id='f'>NEXT FIBONACCI</button>
<div id='r'>_?_<div>Run Code Online (Sandbox Code Playgroud)
您可能感兴趣的另一个 js 库是https://github.com/ubolonton/js-csp - 没有使用它,所以不确定它如何处理背压。
| 归档时间: |
|
| 查看次数: |
296 次 |
| 最近记录: |