来自RxJS的请求流的响应的同步流

jam*_*ref 5 javascript reactive-programming rxjs

我是RxJS的新手,想知道是否有人可以帮助我.

我想从请求流(有效载荷数据)创建一个同步的响应流(最好带有相应的请求).

我基本上希望逐个发送请求,每个请求等待最后一个请求.

我尝试了这个,但它立刻发送了所有内容(jsbin):

var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);

responseStream = requestStream.flatMap(
  sendRequest,
  (val, response)=>{ return {val, response}; }
);

responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);

function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('result for '+val);},1000);
  });
};
Run Code Online (Sandbox Code Playgroud)

以下工作在某种程度上,但不使用流作为请求数据(jsbin).

var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
  var sendNext = function(){
    var val = data.shift();
    if (!val) {
      observer.onCompleted();
      return;
    }
    sendRequest(val).then(response=>{
      observer.onNext({val, response});
      sendNext();
    });
  };
  sendNext();
});

responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);

function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
  });
};
Run Code Online (Sandbox Code Playgroud)

谢谢!

编辑:

只是为了澄清,这就是我想要实现的目标:

"发送A,当您收到A的响应时,发送B,当您收到B的响应时,发送C等......"

使用concatMap和defer,正如user3743222所建议的那样,似乎是这样做的(jsbin):

responseStream = requestStream.concatMap(
  (val)=>{
    return Rx.Observable.defer(()=>{
      return sendRequest(val);
    });
  },
  (val, response)=>{ return {val, response}; }
);
Run Code Online (Sandbox Code Playgroud)

use*_*222 3

尝试在您的第一个代码示例中替换flatMapconcatMap,并让我知道结果行为是否符合您正在寻找的内容。

responseStream = requestStream.concatMap(//I replaced `flatMap`
  sendRequest,
  (val, response)=>{ return {val, response}; }
);
Run Code Online (Sandbox Code Playgroud)

基本上concatMap与 具有相似的签名flatMap,行为的区别在于它将等待当前可观察对象被展平完成,然后再继续下一个。所以在这里:

  • 一个requestStream值将被推送给concatMap运算符。
  • 运算concatMap符将生成一个sendRequest可观察量,该可观察量中的任何值(似乎是一个元组(val, response))都将通过选择器函数传递,并且其对象结果将传递到下游
  • 完成后,将处理sendRequest另一个值。requestStream
  • 简而言之,您的请求将被一一处理

或者,也许您想使用defer推迟sendRequest.

responseStream = requestStream.concatMap(//I replaced `flatMap`
  function(x){return Rx.Observable.defer(function(){return sendRequest(x);})},
  (val, response)=>{ return {val, response}; }
);
Run Code Online (Sandbox Code Playgroud)