ES6承诺替换async.eachLimit/async.mapLimit

Dra*_*SAN 6 javascript promise ecmascript-6 es6-promise

异步中,如果我需要对1000个项目应用异步函数,我可以这样做:

async.mapLimit(items, 10, (item, callback) => {
    foo(item, callback);
});
Run Code Online (Sandbox Code Playgroud)

这样只能同时处理10个项目,从而限制了开销并允许控制.

有了ES6的承诺,我可以很轻松地做到:

Promise.all(items.map((item) => {
    return bar(item);
}));
Run Code Online (Sandbox Code Playgroud)

这将同时处理所有1000个项目,这可能会导致很多问题.

我知道Bluebird有办法解决这个问题,但我正在寻找ES6解决方案.

jib*_*jib 10

如果你不关心结果,那么很快就搞定一个:

Promise.eachLimit = async (funcs, limit) => {
  let rest = funcs.slice(limit);
  await Promise.all(funcs.slice(0, limit).map(async func => {
    await func();
    while (rest.length) {
      await rest.shift()();
    }
  }));
};

// Demo:

var wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function foo(s) {
  await wait(Math.random() * 2000);
  console.log(s);
}

(async () => {
  let funcs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("").map(s => () => foo(s));
  await Promise.eachLimit(funcs, 5);
})();
Run Code Online (Sandbox Code Playgroud)

一个关键性能属性是在任何函数完成后立即运行下一个可用函数。

保存结果

按顺序保留结果可能会使其不那么优雅,但还不错:

Promise.mapLimit = async (funcs, limit) => {
  let results = [];
  await Promise.all(funcs.slice(0, limit).map(async (func, i) => {
    results[i] = await func();
    while ((i = limit++) < funcs.length) {
      results[i] = await funcs[i]();
    }
  }));
  return results;
};

// Demo:

var wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function foo(s) {
  await wait(Math.random() * 2000);
  console.log(s);
  return s.toLowerCase();
}

(async () => {
  let funcs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("").map(s => () => foo(s));
  console.log((await Promise.mapLimit(funcs, 5)).join(""));
})();
Run Code Online (Sandbox Code Playgroud)

  • 对于不熟悉 await/async 符号的任何人,您可以复制/粘贴定义(在 Demo 上方),然后像习惯一样使用 .then() 正常使用它。 (2认同)

T.J*_*der 3

没有内置任何内容,但您当然可以自己将它们分组到承诺链中,并Promise.all在生成的链数组上使用 a :

const items = /* ...1000 items... */;
const concurrencyLimit = 10;
const promise = Promise.all(items.reduce((promises, item, index) => {
    // What chain do we add it to?
    const chainNum = index % concurrencyLimit;
    let chain = promises[chainNum];
    if (!chain) {
        // New chain
        chain = promises[chainNum] = Promise.resolve();
    }
    // Add it
    promises[chainNum] = chain.then(_ => foo(item));
    return promises;
}, []));
Run Code Online (Sandbox Code Playgroud)

下面是一个示例,显示了在给定时间内有多少个并发 Promise(还显示了每个“链”何时完成,并且只执行 200 个而不是 1,000 个):

const items = /* ...1000 items... */;
const concurrencyLimit = 10;
const promise = Promise.all(items.reduce((promises, item, index) => {
    // What chain do we add it to?
    const chainNum = index % concurrencyLimit;
    let chain = promises[chainNum];
    if (!chain) {
        // New chain
        chain = promises[chainNum] = Promise.resolve();
    }
    // Add it
    promises[chainNum] = chain.then(_ => foo(item));
    return promises;
}, []));
Run Code Online (Sandbox Code Playgroud)
const items = buildItems();
const concurrencyLimit = 10;
const promise = Promise.all(items.reduce((promises, item, index) => {
    const chainNum = index % concurrencyLimit;
    let chain = promises[chainNum];
    if (!chain) {
        chain = promises[chainNum] = Promise.resolve();
    }
    promises[chainNum] = chain.then(_ => foo(item));
    return promises;
}, []).map(chain => chain.then(_ => console.log("Chain done"))));
promise.then(_ => console.log("All done"));

function buildItems() {
  const items = [];
  for (let n = 0; n < 200; ++n) {
    items[n] = n;
  }
  return items;
}

var outstanding = 0;
function foo(item) {
  ++outstanding;
  console.log("Starting " + item + " (" + outstanding + ")");
  return new Promise(resolve => {
    setTimeout(_ => {
      --outstanding;
      console.log("Resolving " + item + " (" + outstanding + ")");
      resolve(item);
    }, Math.random() * 500);
  });
}
Run Code Online (Sandbox Code Playgroud)

我应该注意,如果您想跟踪其中每个结果的结果,则必须修改上述内容;它不会尝试跟踪结果(!)。:-)