使用ES6的Promise.all()时限制并发性的最佳方法是什么?

Chr*_*ris 67 javascript node.js es6-promise

我有一些代码在一个列表中进行迭代,该列表从数据库中查询并为该列表中的每个元素发出HTTP请求.该列表有时可能是一个相当大的数字(成千上万),我想确保我没有遇到数千个并发HTTP请求的Web服务器.

此代码的缩写版本目前看起来像这样......

function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
      });
    });
  });
}

Promise.all(getCounts()).then(() => { /* snip */});
Run Code Online (Sandbox Code Playgroud)

此代码在Node 4.3.2上运行.重申Promise.all一下,可以进行管理,以便在任何给定时间只有一定数量的Promise正在进行中?

dec*_*iar 43

使用 Array.prototype.splice

while (funcs.length) {
  // 100 at at time
  await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
Run Code Online (Sandbox Code Playgroud)

  • 这会批量运行函数,而不是池化运行,其中一个函数在另一个函数完成时立即调用。 (23认同)
  • 这是一个被低估的解决方案。喜欢简单。 (4认同)
  • 花了一秒钟的时间来了解它在做什么,但缺乏更多的上下文,例如它是一个批处理而不是一个池。每次从开头或中间进行拼接时,都会对数组进行重新排序。(浏览器必须重新索引所有项目)理论上性能更好的替代方案是从末尾获取内容,而不是“arr.splice(-100)”如果顺序不重要,也许你可以反转数组:P (4认同)
  • 对于批量运行非常有用。注意:当前批次 100% 完成后,才会开始下一批。 (4认同)
  • 很酷。然而,它只是批处理,这意味着当另一个承诺解决时它不会推送新的承诺。 (2认同)

Mat*_*out 41

P-限制

我将promise并发限制与自定义脚本bluebird,es6-promise-pool和p-limit进行了比较.我相信p-limit有一个最简单,最简单的实现来满足这种需求.查看他们的文档.

要求

在示例中与async兼容

我的例子

在这个例子中,我们需要为数组中的每个URL运行一个函数(比如,可能是一个API请求).这里叫做fetchData().如果我们要处理数千个项目的数组,那么并发对于节省CPU和内存资源肯定是有用的.

const pLimit = require('p-limit');

// Example Concurrency of 3 promise at once
const limit = pLimit(3);

let urls = [
    "http://www.exampleone.com/",
    "http://www.exampletwo.com/",
    "http://www.examplethree.com/",
    "http://www.examplefour.com/",
]

// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {

    // wrap the function we are calling in the limit function we defined above
    return limit(() => fetchData(url));
});

(async () => {
    // Only three promises are run at once (as defined above)
    const result = await Promise.all(promises);
    console.log(result);
})();
Run Code Online (Sandbox Code Playgroud)

控制台日志结果是已解析的promises响应数据的数组.

  • 感谢您进行比较。您是否与 https://github.com/rxaviers/async-pool 进行了比较? (3认同)
  • @AndyRay 先生,这个答案已经有 5 年历史了,p-limit 的用途增加了 10 倍。你担心什么成熟度 (3认同)
  • 谢谢这个!这一个简单得多 (2认同)
  • 这是迄今为止我所看到的最好的用于限制同时请求的库。很好的例子,谢谢! (2认同)
  • 除此之外,npm 每周的下载量约为 2000 万,而其他答案中提到的其他库的下载量约为 200-100k。 (2认同)

Tim*_*imo 38

请注意,Promise.all()不会触发承诺开始他们的工作,创建承诺本身.

考虑到这一点,一种解决方案是检查承诺何时解决是否应该启动新承诺或者您是否已经达到极限.

但是,这里真的没有必要重新发明轮子.您可以用于此目的的一个库是es6-promise-pool.从他们的例子:

// On the Web, leave out this line and use the script tag above instead. 
var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
}

// The number of promises to process simultaneously. 
var concurrency = 3

// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool. 
var poolPromise = pool.start()

// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)
})
Run Code Online (Sandbox Code Playgroud)

  • 不幸的是,es6-promise-pool重新发明了Promise而不是使用它们.我建议使用这个简洁的解决方案(如果你已经使用ES6或ES7)https://github.com/rxaviers/async-pool (12认同)
  • 看看两者,异步池看起来更好!更直接,更轻巧。 (3认同)
  • 我还发现 p-limit 是最简单的实现。请参阅下面的示例。/sf/answers/3658341711/ (3认同)
  • 我认为,tiny-asyc-pool 是限制承诺并发性的更好、非侵入性且相当自然的解决方案。 (2认同)

Dan*_*ich 21

正如此答案线程中的所有其他人所指出的那样,Promise.all()如果您需要限制并发性,则不会做正确的事情。但理想情况下,您甚至不应该等到所有Promise完成后再处理它们。

相反,您希望在每个结果可用时尽快对其进行处理,因此您不必等到最后一个 Promise 完成后再开始迭代它们。

因此,这里有一个代码示例,它部分基于Endless 的答案以及TJ Crowder 的答案

编辑:我已经将这个小片段变成了一个库concurrency-limit-runner

// example tasks that sleep and return a number
// in real life, you'd probably fetch URLs or something
const tasks = [];
for (let i = 0; i < 20; i++) {
    tasks.push(async () => {
        console.log(`start ${i}`);
        await sleep(Math.random() * 1000);
        console.log(`end ${i}`);
        return i;
    });
}
function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }

(async () => {
    for await (let value of runTasks(3, tasks.values())) {
        console.log(`output ${value}`);
    }
})();

async function* runTasks(maxConcurrency, taskIterator) {
    async function* createWorkerIterator() {
        // Each AsyncGenerator that this function* creates is a worker,
        // polling for tasks from the shared taskIterator. Sharing the
        // taskIterator ensures that each worker gets unique tasks.
        for (const task of taskIterator) yield await task();
    }

    const asyncIterators = new Array(maxConcurrency);
    for (let i = 0; i < maxConcurrency; i++) {
        asyncIterators[i] = createWorkerIterator();
    }
    yield* raceAsyncIterators(asyncIterators);
}

async function* raceAsyncIterators(asyncIterators) {
    async function nextResultWithItsIterator(iterator) {
        return { result: await iterator.next(), iterator: iterator };
    }
    /** @type Map<AsyncIterator<T>,
        Promise<{result: IteratorResult<T>, iterator: AsyncIterator<T>}>> */
    const promises = new Map();
    for (const iterator of asyncIterators) {
        promises.set(iterator, nextResultWithItsIterator(iterator));
    }
    while (promises.size) {
        const { result, iterator } = await Promise.race(promises.values());
        if (result.done) {
            promises.delete(iterator);
        } else {
            promises.set(iterator, nextResultWithItsIterator(iterator));
            yield result.value;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这里有很多魔法;让我解释。

该解决方案是围绕异步生成器函数构建的,许多 JS 开发人员可能不熟悉这些函数。

生成器函数(又名function*函数)返回一个“生成器”,即结果的迭代器。生成器函数可以yield在您通常使用关键字的地方使用该return关键字。调用者第一次调用next()生成器(或使用循环for...of)时,该function*函数将运行直到它yield达到值为止;这将成为next()迭代器的值。但在接下来的时间next()被调用时,生成器函数会从该yield语句继续,就在它停止的地方,即使它位于循环的中间。(您还可以yield*, 生成另一个生成器函数的所有结果。)

“异步生成器函数”( async function*) 是一个返回“异步迭代器”的生成器函数,“异步迭代器”是 Promise 的迭代器。您可以调用for await...of异步迭代器。异步生成器函数可以使用await关键字,就像您在任何async function.

在示例中,我们runTasks()使用任务函数数组进行调用;我们调用.values()数组将数组转换为迭代器。

runTasks()是一个异步生成器函数,因此我们可以用循环调用它for await...of。每次循环运行时,我们都会处理最新完成的任务的结果。

runTasks()创建 N 个异步迭代器,即“工人”。每个工作人员从共享中轮询任务taskIterator,确保每个工作人员获得唯一的任务。

该示例调用了runTasks3 个并发工作线程,因此同时启动的任务不会超过 3 个。当任何任务完成时,我们立即将下一个任务排队。(这优于“批处理”,在“批处理”中,您一次执行 3 个任务,等待所有三个任务,并且在整个上一个批次完成之前不要开始下一个批次的三个任务。)

runTasks()最后将其异步迭代器与yield* raceAsyncIterators(). raceAsyncIterators()就像Promise.race()但它与 Promise 的 N 个迭代器竞争,而不仅仅是 N 个 Promise;它返回一个异步迭代器,该迭代器产生已解决的 Promise 的结果。

raceAsyncIterators()首先定义promises Map每个迭代器的 a 到 Promise。每个 Promise 都是对迭代结果以及生成它的迭代器的 Promise。

通过promises映射,我们可以获得Promise.race()映射的值,从而为我们提供获胜的迭代结果及其迭代器。如果迭代器完全是done,我们将其从映射中删除;否则,我们将映射中的 Promise 替换promises为迭代器的next()Promise 和yield result.value

总之,runTasks()它是一个异步生成器函数,它产生 N 个并发异步任务迭代器的结果,因此最终用户可以for await (let value of runTasks(3, tasks.values()))在每个结果可用时立即对其进行处理。

  • 我设法使用这个函数来运行最大项目的承诺池,它的性能非常好,而且与“@supercharge/promise-pool”等解决方案相比,它的开销较低,这个答案值得更多投票 (2认同)

Jin*_*hen 13

bluebird的Promise.map可以采用并发选项来控制并行运行的promises数量.有时它比.all因为您不需要创建promise数组更容易.

const Promise = require('bluebird')

function getCounts() {
  return Promise.map(users, user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
       });
    });
  }, {concurrency: 10}); // <---- at most 10 http requests at a time
}
Run Code Online (Sandbox Code Playgroud)

  • 一切都取决于一件事对您的重要性,以及是否有其他更快/更简单的更好方法。典型的权衡。我会选择易用性和功能超过几 kb,但 YMMV。 (3认同)

tco*_*ooc 6

不使用promises来限制http请求,而是使用node的内置http.Agent.maxSockets.这消除了使用库或编写自己的池代码的要求,并且具有更多的优势,可以更好地控制您的限制.

agent.maxSockets

默认情况下设置为Infinity.确定代理可以为每个源打开的并发套接字数.Origin是'host:port'或'host:port:localAddress'组合.

例如:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);
Run Code Online (Sandbox Code Playgroud)

如果对同一个来源发出多个请求,则设置keepAlive为true 也可能会让您受益(有关详细信息,请参阅上面的文档).

  • 仍然,立即创建数千个闭包并集合套接字似乎不是非常有效? (9认同)

End*_*ess 6

如果您知道迭代器如何工作以及如何使用迭代器,则不需要任何额外的库,因为自己构建自己的并发变得非常容易。让我示范一下:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)
  }
}
Run Code Online (Sandbox Code Playgroud)

我们可以使用相同的迭代器,并在工作人员之间共享它。
如果您使用.entries().values()是2D [index, value]并发,则将获得一个2D数组,我将在下面演示

const sleep = n => new Promise(rs => setTimeout(rs,n))

async function doWork(iterator) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(index + ': ' + item)
  }
}

const arr = Array.from('abcdefghij')
const workers = new Array(2).fill(arr.entries()).map(doWork)
//    ^--- starts two workers sharing the same iterator

Promise.all(workers).then(() => console.log('done'))
Run Code Online (Sandbox Code Playgroud)


注意:与示例async-pool相比,它的不同之处在于它产生了两个工作程序,因此,如果一个工作程序由于某种原因在索引5上抛出错误,它不会阻止其他工作程序执行其余工作。因此,您从进行2次并发下降到1次(这样就不会在那里停止了)。因此,要知道所有工作人员何时完成将变得更加困难,因为Promise.all如果一次失败,它将提早保释。所以我的建议是您要捕获doWork函数内部的所有错误

  • 我喜欢这种方法 - 关键的见解是,如果多个异步函数使用“for...of”迭代*相同的*迭代器,它们将无缝协作地处理迭代器的元素,从而产生紧凑且高性能的代码。但我认为这里的示例相当混乱,因此我对其进行了重大重写以突出这一关键见解:https://gist.github.com/fasiha/7f20043a12ce93401d8473aee037d90a(在[TypeScript Playground](https://www.typescriptlang)中运行良好.org/play))。 (3认同)
  • 这绝对是一个很酷的方法!只需确保您的并发数不超过任务列表的长度(如果您无论如何关心结果),因为您最终可能会得到额外的内容! (2认同)
  • 这是最好的答案。没有依赖性。只是纯 JavaScript。做得好。 (2认同)