并行运行异步任务

Pav*_*kic 5 javascript rxjs

在 RxJS 中,当您想按顺序运行 http 请求时 - 将它们链接起来。但我不清楚如何并行运行请求?我在http://reactive-extensions.github.io/learnrx/的示例中看到他们使用 Observable.zip() 并行运行 2 个请求。但是如何并行运行 5 个请求呢?更具体地说,我如何设置以便调用我的函数:

  • 当所有 5 完成?
  • 什么时候完成?

小智 8

使用combineLatestforkJoin

// Assume you have an array of urls
const urls = [
  "twitter.com/puppies.json", 
  "google.com/puppies.json", 
  "facebook.com/puppies.json"
];

// Let's map these urls to Ajax Observables
const requests = urls.map(url => Rx.DOM.Ajax.getJSON(url))

// Now combine the result from each request into an observable
// Here's combineLatest:
const allThePuppies$ = Rx.Observable.combineLatest(...urls)
// Alternatively, here's forkJoin:
const allThePuppies$ = Rx.Observable.forkJoin(urls)


// When you subscribe to `allThePuppies$`, you'll kick off all your requests in parallel, and your response will contain an array with the results from each request:
allThePuppies$.subscribe(results => {
  const twitterPuppies, googlePuppies, facebookPuppies = results;
  // Do what you must with the respective responses
  // (Presumably in this example you'd show your users some adorable pics of puppies)
})
Run Code Online (Sandbox Code Playgroud)

combineLatest 接收任意数量的 observable,一旦它们中的每一个都发出了至少一个值,当这些 observable 中的任何一个触发时,它将从每个 observable 中发出一个包含最新值的数组。

不过,这太抽象了。出于我们的目的,我们知道少数 ajax 请求实际上只会发出一次。因此,如果我们使用combineLatest少量 ajax observable,我们最终会得到一个 observable,它从每个 ajax 请求发出一组结果。

forkJoin类似于combineLatest,但它只在其每个组成可观察对象完成后才发出其响应数组。


And*_*eng 1

这是一个很老的问题,但没有公认的答案。您正在寻找的答案可能出奇地简单:concatMap

当 Promise 创建后,它立即开始执行,因此它们是并行执行的;而当从一个可观察对象发出值时,它们是串行的。

因此,将这两者结合起来,对于以下代码片段,Promise 中的可观察量是并行执行的,并且它们的结果是串行发出的,因为 concatMap 按照它们创建的顺序将它们放入一个流中。

Rx.Observable.from(urls_array)
.concatMap(function(url) { return Rx.Observable.fromPromise(Promise.resolve($.get(url))) })
.subscribe(
  function(jsonObj) {
    // first result will arrive first
  },
  function(err) { },
  function() {
    // all completed
  }
)
Run Code Online (Sandbox Code Playgroud)