小编ter*_*kes的帖子

合并可观察序列,其中一些是异步并保留顺序

作为一个例子,我有两个可观察的序列,"data1"和"data2",我希望将它们合并为一个可观察序列,同时保留初始顺序.

当两个可观察序列没有异步工作时 - 这里以小延迟建模 - 这很容易实现Rx.Observable.merge.但是,使用这种方法,任何异步工作都会破坏订单.

是否可以合并可观察序列,使用内置运算符传递已知和等待值尚未知的值?如果没有,我应该建立什么类型的运营商?

'use strict';
var Rx = require('rx');
var EventEmitter = require('events').EventEmitter;

var eventEmitter = new EventEmitter();

var end = Rx.Observable.fromEvent(eventEmitter, 'end');

var data1 = Rx.Observable.fromEvent(eventEmitter, 'data1')
  .takeUntil(end);
var data2 = Rx.Observable.fromEvent(eventEmitter, 'data2')
  .takeUntil(end)
  .delay(1000);

Rx.Observable
  .merge(data1, data2)
  .reduce(function(acc, str) {
    return acc + str + ';';
  }, '')
  .subscribe(function(data) {
    console.log(data);
  });

eventEmitter.emit('data1', '1');
eventEmitter.emit('data2', '2');
eventEmitter.emit('data1', '3');
eventEmitter.emit('data1', '4');
eventEmitter.emit('data2', '5');
eventEmitter.emit('end');

// expected: "1;2;3;4;5;"
// actual: "1;3;4;2;5;"
Run Code Online (Sandbox Code Playgroud)

谢谢.

javascript rxjs

2
推荐指数
1
解决办法
1978
查看次数

标签 统计

javascript ×1

rxjs ×1