Rx.js与promises的并发性

met*_*jus 2 javascript concurrency promise rxjs

我想通过一系列异步/网络操作(远程HTTP请求)来处理对象数组.

在其中一些操作中,我想确保不会同时处理多达X个项目.

我怎样才能做到这一点?

示例代码:

function someAsyncOp(item) {...} // returns a promise

var source = Rx.Observable.from([{item1},{item2},...])
source
  .flatMap((item) => {

    // I WANT THE FOLLOWING OPERATION TO BE EXECUTING  
    // ON AT MAX 10 ITEMS AT A TIME, NEXT ITEM SHOULD
    // BE SUBMITTED ONLY WHEN A SLOT GETS FREED AS A 
    // RESULT OF THE PROMISE SUCCEEDING OR FAILING

    return Rx.Observable.fromPromise(someAsyncOp(item))

  })
  .subscribe(
    console.log, 
    console.error, 
    () => console.log('completed')
  )
Run Code Online (Sandbox Code Playgroud)

pau*_*els 5

有一个flatMap被叫的兄弟,flatMapWithMaxConcurrent它带有一个并发参数.它在功能上类似于map(fn).merge(n)本杰明的回答所暗示的.

function someAsyncOp(item) {...} // returns a promise

var source = Rx.Observable.from([{item1},{item2},...])
source
   //Only allow a max of 10 items to be subscribed to at once
  .flatMapWithMaxConcurrent(10, (item) => {

    //Since a promise is eager you need to defer execution of the function
    //that produces it until subscription. Defer will implicitly accept a promise
    return Rx.Observable.defer(() => someAsyncOp(item))

    //If you want the whole thing to continue regardless of exceptions you should also
    //catch errors from the individual processes
                        .catch(Rx.Observable.empty())
  })
  .subscribe(
    console.log, 
    console.error, 
    () => console.log('completed')
  )
Run Code Online (Sandbox Code Playgroud)