如何在RXJS中传输异步流?

Roy*_*mir 5 javascript rxjs

我试图了解流如何通过RXjs中的管道传输.
我知道这不应该是一个问题,因为这是异步流的整个想法 - 但仍然有我想要理解的东西.

看看这段代码:

var source = Rx.Observable
    .range(1, 3)
    .flatMapLatest(function (x) {  //`switch` these days...
        return Rx.Observable.range(x*100, 2);
    });


 source.subscribe(value => console.log('I got a value ', value))
Run Code Online (Sandbox Code Playgroud)

结果:

I got a value 100
I got a value 200
I got a value 300
I got a value 301
Run Code Online (Sandbox Code Playgroud)

我相信(IIUC)图表是这样的:(通知标记为101,201,取消订阅)

----1---------2------------------3------------------------------|

????????flatMapLatest(x=>Rx.Observable.range(x*100, 2))????????
-----100-------(-?1?0?1?)-------200---(-?2?0?1?)-----300------301-------------
Run Code Online (Sandbox Code Playgroud)

这是一个问题:

题:

是否始终保证2将在(101)之前到达?同样的那个3是在(201)之前到达的?

我的意思是 - 如果我不想看一个时间线,那么下面的图表发生是完全合法的:

----1---------------2---------------3------------------------------|

????????flatMapLatest(x=>Rx.Observable.range(x*100, 2))????????
-----100-------101------200---201-----300------301-------------
Run Code Online (Sandbox Code Playgroud)

2已经发出101的情况下稍微延迟到达的地方

我在这里错过了什么?管道如何在这里工作?

mar*_*tin 1

对于具有特定 RxJS 版本的特定 Observable 链,排放顺序将始终相同。

如前所述,在 RxJS 4 中,它使用currentThread调度程序,如下所示: https: //github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39
所有调度程序(immediateRxJS 4 中的调度程序除外)都在内部使用某种类型的队列,因此顺序始终相同。

事件的顺序与您在图中显示的非常相似(......或者至少我认为是这样):

  1. 1被调度并发出,因为它是队列中唯一的操作。
  2. 100已安排。此时,调度程序队列中不再有任何操作,因为2尚未进行调度。RangeObservable 在调用 后,onNext()该函数会递归地安排另一次发射。这意味着100之前已经安排好了2
  3. 2已预定
  4. 100已发出,101已安排
  5. 2被发射、101被处置。
  6. ... 等等

请注意,这种行为在 RxJS 4 和 RxJS 5 中是不同的。

在 RxJS 5 中,大多数 Observables 和运算符默认不使用任何调度程序(一个明显的例外是需要处理延迟的 Observables/operator)。因此,在RxJS 5中,RangeObservable不会安排任何内容并立即开始循环发出值。

RxJS 5 中的相同示例将产生不同的结果:

const source = Observable
  .range(1, 3)
  .switchMap(function (x) {
    return Observable.range(x * 100, 2);
  });

source.subscribe(value => console.log('I got a value ', value));
Run Code Online (Sandbox Code Playgroud)

这将打印以下内容:

I got a value  100
I got a value  101
I got a value  200
I got a value  201
I got a value  300
I got a value  301
Run Code Online (Sandbox Code Playgroud)

但是,如果您添加例如delay(0). 常识表明这不应该做任何事情:

const source = Observable
  .range(1, 3)
  .switchMap(function (x) {
    return Observable.range(x * 100, 2).delay(0);
  });

source.subscribe(value => console.log('I got a value ', value));
Run Code Online (Sandbox Code Playgroud)

现在只有内部RangeObservable被调度并重新处理几次,这使得它只发出最后一个的值RangeObservable

I got a value  300
I got a value  301
Run Code Online (Sandbox Code Playgroud)