Rxjs:实现队列

Bar*_*urg 5 rxjs

使用promises,实现队列以防止例如并行运行多个HTTP请求非常容易:

class Runner {
  private promise;
  constructor(http) {
    this.promise = q.resolve();
  }
  getUrl() {
    return this.promise = this.promise.then(() => http.get('http://someurl'))
  }
}

var runner = new Runner(http);

var lastPromise;
for (var i = 0; i < 10; i++) {
  lastPromise = runner.getUrl();
}

lastPromise.then(() => console.log("job's done!");
Run Code Online (Sandbox Code Playgroud)

我无法弄清楚如何在Rxjs中做到这一点.如果我尝试类似于上面的内容,那么当我添加请求时,所有先前的HTTP调用都会重复,因为它只是添加到流中并重新运行整个事务.

我读了一些关于队列调度程序的东西,但这似乎不再存在(不再)了?

Mar*_*ten 16

您可以使用像@cartant建议的concat:

const urlQueue = Observable.fromPromise(http.get('http://someurl'))
  .concat(Observable.fromPromise(http.get('http://someurl')))
  .concat(Observable.fromPromise(http.get('http://someurl')));
Run Code Online (Sandbox Code Playgroud)

但是你需要在订阅之前构造这样的流并让队列处理它.也; fromPromise仍然渴望所以你的承诺将在你调用上面的代码时直接开始运行.要解决这个问题,您需要使用Defer():

const urls = [
  'http://someurl',
  'http://someurl',
  'http://someurl',
];

const queue = urls
  .map(url => Observable.defer(() => http.get(url))
  .reduce((acc, curr) => acc.concat(curr));
Run Code Online (Sandbox Code Playgroud)

此方法使用本机数组map将URL转换为Observables,然后将reduce它们连接在一起形成一个大流.

一个更好的解决方案是将您的url放入流中,然后使用mergeMap附加的并发:

const urls = [
  'http://someurl',
  'http://someurl',
  'http://someurl',
];

const queuedGets = Observable.from(urls)
  .mergeMap(url => http.get(url), null, 1);
Run Code Online (Sandbox Code Playgroud)

这将导致在上一个网址完成后逐个检索网址,但您仍需要在开始之前准备好所有网址.根据您的用例,这可能就足够了.请注意,将mergeMap并发设置1为的a等同于仅使用concatMap

这个难题的最后一部分可能是您需要以自己的速度将新网址推送到队列中.为此,您需要一个主题

主题类似于Observable,但可以多播到许多观察者.主题就像EventEmitters:它们维护着许多听众的注册表.

class HttpGetQueue {
  const queue = new Subject();

  constructor() {
    public results = queue
      .mergeMap(url => http.get(url), null, 1);
  }

  addToQueue(url) {
    queue.next(url);
  }
}

const instance = new HttpGetQueue();
instance.results.subscribe(res => console.log('got res: ' + res);
instance.addToQueue('http://someurl');
instance.addToQueue('http://someurl');
instance.addToQueue('http://someurl');
Run Code Online (Sandbox Code Playgroud)

  • `mergeMap` 的第三个参数对我来说是个新闻!文档对 `concurrent` 的细节非常清楚;我建议使用更专业的 [`concatMap`](https://www.learnrxjs.io/operators/transformation/concatmap.html)。 (3认同)
  • 好的,我最终做了以下事情:`addToQueue(url) { var s = new Subject(); queue.next([url, 主题]); 返回 s.asObservable(); }` 和队列:`public results = queue.mergeMap(input =&gt; http.get(input[0]).do(res =&gt; { input[1].next(res); input[1].complete( ); }).catch(e =&gt; { input[1].error(e); return null; }`。似乎可以解决问题。这有意义吗? (2认同)