我试图了解流如何通过RXjs中的管道传输.
我知道这不应该是一个问题,因为这是异步流的整个想法 - 但仍然有我想要理解的东西.
看看这段代码:
var source = Rx.Observable
.range(1, 3)
.flatMapLatest(function (x) { //`switch` these days...
return Rx.Observable.range(x*100, 2);
});
source.subscribe(value => console.log('I got a value ', value))
Run Code Online (Sandbox Code Playgroud)
结果:
I got a value 100
I got a value 200
I got a value 300
I got a value 301
Run Code Online (Sandbox Code Playgroud)
我相信(IIUC)图表是这样的:(通知标记为101,201,取消订阅)
----1---------2------------------3------------------------------|
????????flatMapLatest(x=>Rx.Observable.range(x*100, 2))????????
-----100-------(-?1?0?1?)-------200---(-?2?0?1?)-----300------301-------------
Run Code Online (Sandbox Code Playgroud)
这是一个问题:
题:
是否始终保证2将在(101)之前到达?同样的那个3是在(201)之前到达的?
我的意思是 - 如果我不想看一个时间线,那么下面的图表发生是完全合法的:
----1---------------2---------------3------------------------------|
????????flatMapLatest(x=>Rx.Observable.range(x*100, 2))????????
-----100-------101------200---201-----300------301-------------
Run Code Online (Sandbox Code Playgroud)
在2已经发出101的情况下稍微延迟到达的地方
我在这里错过了什么?管道如何在这里工作?
对于具有特定 RxJS 版本的特定 Observable 链,排放顺序将始终相同。
如前所述,在 RxJS 4 中,它使用currentThread调度程序,如下所示: https: //github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39。
所有调度程序(immediateRxJS 4 中的调度程序除外)都在内部使用某种类型的队列,因此顺序始终相同。
事件的顺序与您在图中显示的非常相似(......或者至少我认为是这样):
1被调度并发出,因为它是队列中唯一的操作。100已安排。此时,调度程序队列中不再有任何操作,因为2尚未进行调度。RangeObservable 在调用 后,onNext()该函数会递归地安排另一次发射。这意味着100之前已经安排好了2。2已预定。100已发出,101已安排2被发射、101被处置。请注意,这种行为在 RxJS 4 和 RxJS 5 中是不同的。
在 RxJS 5 中,大多数 Observables 和运算符默认不使用任何调度程序(一个明显的例外是需要处理延迟的 Observables/operator)。因此,在RxJS 5中,RangeObservable不会安排任何内容并立即开始循环发出值。
RxJS 5 中的相同示例将产生不同的结果:
const source = Observable
.range(1, 3)
.switchMap(function (x) {
return Observable.range(x * 100, 2);
});
source.subscribe(value => console.log('I got a value ', value));
Run Code Online (Sandbox Code Playgroud)
这将打印以下内容:
I got a value 100
I got a value 101
I got a value 200
I got a value 201
I got a value 300
I got a value 301
Run Code Online (Sandbox Code Playgroud)
但是,如果您添加例如delay(0). 常识表明这不应该做任何事情:
const source = Observable
.range(1, 3)
.switchMap(function (x) {
return Observable.range(x * 100, 2).delay(0);
});
source.subscribe(value => console.log('I got a value ', value));
Run Code Online (Sandbox Code Playgroud)
现在只有内部RangeObservable被调度并重新处理几次,这使得它只发出最后一个的值RangeObservable:
I got a value 300
I got a value 301
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
118 次 |
| 最近记录: |