如何在RxJS5中应用定时背压?

Hug*_*ira 8 javascript observable rxjs reactivex rxjs5

想象一下,我有以下代码:

let a = Rx.Observable.of(1, 2, 3)
let b = Observable.zip(a, a, (a, b) => a + b)
b.forEach(t => console.log(t))
Run Code Online (Sandbox Code Playgroud)

这会立即输出结果.现在,我如何在每个消息之间设置一个定时延迟作为一种反压方式(注意我不想要一个缓冲区;相反,我想要ab成为Cold Observables),如:

b.takeEvery(1000).forEach(t => console.log(t))
Run Code Online (Sandbox Code Playgroud)

并得到完全相同的答案:

<wait 1s>
2
<wait 1s>
4
<wait 1s>
6
Run Code Online (Sandbox Code Playgroud)

替代方案:如果RxJS不支持背压(某些可观测量的pull拉机制),那么如何在不耗尽资源的情况下创建无限生成器呢?

备选方案2:支持拉动和推动机制的其他JS框架?

art*_*iak 3

如果是 RxJS 5.x 则不支持背压,但4.x 版本中有例如pausable运算符。它仅适用于热可观察量。有关4.x情况下的背压的更多信息,请参见此处(特别是在底部和 RxJS 相关描述中获取战利品)。

这条 Erik Meijer 的推文可能有点争议但相关:https://twitter.com/headinthebox/status/774635475071934464

对于您自己的背压机制的实现,您需要有 2 路通信通道,可以使用 2 个主题轻松创建该通道 - 每端一个。主要用于next发送消息和.subscribe向另一端列出。

创建一个生成器也是可行的 - 再次使用主题在推式和拉式世界之间建立桥梁。下面是生成斐波那契数的示例性实现。

const fib = () => {
  const n = new Rx.Subject()
  const f = n
    .scan(c => ({ a: c.b, b: c.b + c.a }), { a: 0, b: 1 })
    .map(c => c.a)
    
  return {
    $: f,
    next: () => n.next()
  }
}

const f = fib()

f.$.subscribe(n => document.querySelector('#r').innerHTML = n)
Rx.Observable.fromEvent(document.querySelector('#f'), 'click')
  .do(f.next)
  .subscribe()
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

<button id='f'>NEXT FIBONACCI</button>

<div id='r'>_?_<div>
Run Code Online (Sandbox Code Playgroud)

您可能感兴趣的另一个 js 库是https://github.com/ubolonton/js-csp - 没有使用它,所以不确定它如何处理背压。