Chr*_*ris 67 javascript node.js es6-promise
我有一些代码在一个列表中进行迭代,该列表从数据库中查询并为该列表中的每个元素发出HTTP请求.该列表有时可能是一个相当大的数字(成千上万),我想确保我没有遇到数千个并发HTTP请求的Web服务器.
此代码的缩写版本目前看起来像这样......
function getCounts() {
return users.map(user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
});
}
Promise.all(getCounts()).then(() => { /* snip */});
Run Code Online (Sandbox Code Playgroud)
此代码在Node 4.3.2上运行.重申Promise.all一下,可以进行管理,以便在任何给定时间只有一定数量的Promise正在进行中?
dec*_*iar 43
使用 Array.prototype.splice
while (funcs.length) {
// 100 at at time
await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
Run Code Online (Sandbox Code Playgroud)
Mat*_*out 41
P-限制
我将promise并发限制与自定义脚本bluebird,es6-promise-pool和p-limit进行了比较.我相信p-limit有一个最简单,最简单的实现来满足这种需求.查看他们的文档.
要求
在示例中与async兼容
我的例子
在这个例子中,我们需要为数组中的每个URL运行一个函数(比如,可能是一个API请求).这里叫做fetchData().如果我们要处理数千个项目的数组,那么并发对于节省CPU和内存资源肯定是有用的.
const pLimit = require('p-limit');
// Example Concurrency of 3 promise at once
const limit = pLimit(3);
let urls = [
"http://www.exampleone.com/",
"http://www.exampletwo.com/",
"http://www.examplethree.com/",
"http://www.examplefour.com/",
]
// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {
// wrap the function we are calling in the limit function we defined above
return limit(() => fetchData(url));
});
(async () => {
// Only three promises are run at once (as defined above)
const result = await Promise.all(promises);
console.log(result);
})();
Run Code Online (Sandbox Code Playgroud)
控制台日志结果是已解析的promises响应数据的数组.
Tim*_*imo 38
请注意,Promise.all()不会触发承诺开始他们的工作,创建承诺本身.
考虑到这一点,一种解决方案是检查承诺何时解决是否应该启动新承诺或者您是否已经达到极限.
但是,这里真的没有必要重新发明轮子.您可以用于此目的的一个库是es6-promise-pool.从他们的例子:
// On the Web, leave out this line and use the script tag above instead.
var PromisePool = require('es6-promise-pool')
var promiseProducer = function () {
// Your code goes here.
// If there is work left to be done, return the next work item as a promise.
// Otherwise, return null to indicate that all promises have been created.
// Scroll down for an example.
}
// The number of promises to process simultaneously.
var concurrency = 3
// Create a pool.
var pool = new PromisePool(promiseProducer, concurrency)
// Start the pool.
var poolPromise = pool.start()
// Wait for the pool to settle.
poolPromise.then(function () {
console.log('All promises fulfilled')
}, function (error) {
console.log('Some promise rejected: ' + error.message)
})
Run Code Online (Sandbox Code Playgroud)
Dan*_*ich 21
正如此答案线程中的所有其他人所指出的那样,Promise.all()如果您需要限制并发性,则不会做正确的事情。但理想情况下,您甚至不应该等到所有Promise完成后再处理它们。
相反,您希望在每个结果可用时尽快对其进行处理,因此您不必等到最后一个 Promise 完成后再开始迭代它们。
因此,这里有一个代码示例,它部分基于Endless 的答案以及TJ Crowder 的答案。
编辑:我已经将这个小片段变成了一个库concurrency-limit-runner。
// example tasks that sleep and return a number
// in real life, you'd probably fetch URLs or something
const tasks = [];
for (let i = 0; i < 20; i++) {
tasks.push(async () => {
console.log(`start ${i}`);
await sleep(Math.random() * 1000);
console.log(`end ${i}`);
return i;
});
}
function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
(async () => {
for await (let value of runTasks(3, tasks.values())) {
console.log(`output ${value}`);
}
})();
async function* runTasks(maxConcurrency, taskIterator) {
async function* createWorkerIterator() {
// Each AsyncGenerator that this function* creates is a worker,
// polling for tasks from the shared taskIterator. Sharing the
// taskIterator ensures that each worker gets unique tasks.
for (const task of taskIterator) yield await task();
}
const asyncIterators = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) {
asyncIterators[i] = createWorkerIterator();
}
yield* raceAsyncIterators(asyncIterators);
}
async function* raceAsyncIterators(asyncIterators) {
async function nextResultWithItsIterator(iterator) {
return { result: await iterator.next(), iterator: iterator };
}
/** @type Map<AsyncIterator<T>,
Promise<{result: IteratorResult<T>, iterator: AsyncIterator<T>}>> */
const promises = new Map();
for (const iterator of asyncIterators) {
promises.set(iterator, nextResultWithItsIterator(iterator));
}
while (promises.size) {
const { result, iterator } = await Promise.race(promises.values());
if (result.done) {
promises.delete(iterator);
} else {
promises.set(iterator, nextResultWithItsIterator(iterator));
yield result.value;
}
}
}Run Code Online (Sandbox Code Playgroud)
这里有很多魔法;让我解释。
该解决方案是围绕异步生成器函数构建的,许多 JS 开发人员可能不熟悉这些函数。
生成器函数(又名function*函数)返回一个“生成器”,即结果的迭代器。生成器函数可以yield在您通常使用关键字的地方使用该return关键字。调用者第一次调用next()生成器(或使用循环for...of)时,该function*函数将运行直到它yield达到值为止;这将成为next()迭代器的值。但在接下来的时间next()被调用时,生成器函数会从该yield语句继续,就在它停止的地方,即使它位于循环的中间。(您还可以yield*, 生成另一个生成器函数的所有结果。)
“异步生成器函数”( async function*) 是一个返回“异步迭代器”的生成器函数,“异步迭代器”是 Promise 的迭代器。您可以调用for await...of异步迭代器。异步生成器函数可以使用await关键字,就像您在任何async function.
在示例中,我们runTasks()使用任务函数数组进行调用;我们调用.values()数组将数组转换为迭代器。
runTasks()是一个异步生成器函数,因此我们可以用循环调用它for await...of。每次循环运行时,我们都会处理最新完成的任务的结果。
runTasks()创建 N 个异步迭代器,即“工人”。每个工作人员从共享中轮询任务taskIterator,确保每个工作人员获得唯一的任务。
该示例调用了runTasks3 个并发工作线程,因此同时启动的任务不会超过 3 个。当任何任务完成时,我们立即将下一个任务排队。(这优于“批处理”,在“批处理”中,您一次执行 3 个任务,等待所有三个任务,并且在整个上一个批次完成之前不要开始下一个批次的三个任务。)
runTasks()最后将其异步迭代器与yield* raceAsyncIterators(). raceAsyncIterators()就像Promise.race()但它与 Promise 的 N 个迭代器竞争,而不仅仅是 N 个 Promise;它返回一个异步迭代器,该迭代器产生已解决的 Promise 的结果。
raceAsyncIterators()首先定义promises Map每个迭代器的 a 到 Promise。每个 Promise 都是对迭代结果以及生成它的迭代器的 Promise。
通过promises映射,我们可以获得Promise.race()映射的值,从而为我们提供获胜的迭代结果及其迭代器。如果迭代器完全是done,我们将其从映射中删除;否则,我们将映射中的 Promise 替换promises为迭代器的next()Promise 和yield result.value。
总之,runTasks()它是一个异步生成器函数,它产生 N 个并发异步任务迭代器的结果,因此最终用户可以for await (let value of runTasks(3, tasks.values()))在每个结果可用时立即对其进行处理。
Jin*_*hen 13
bluebird的Promise.map可以采用并发选项来控制并行运行的promises数量.有时它比.all因为您不需要创建promise数组更容易.
const Promise = require('bluebird')
function getCounts() {
return Promise.map(users, user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
}, {concurrency: 10}); // <---- at most 10 http requests at a time
}
Run Code Online (Sandbox Code Playgroud)
不使用promises来限制http请求,而是使用node的内置http.Agent.maxSockets.这消除了使用库或编写自己的池代码的要求,并且具有更多的优势,可以更好地控制您的限制.
agent.maxSockets
默认情况下设置为Infinity.确定代理可以为每个源打开的并发套接字数.Origin是'host:port'或'host:port:localAddress'组合.
例如:
var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);
Run Code Online (Sandbox Code Playgroud)
如果对同一个来源发出多个请求,则设置keepAlive为true 也可能会让您受益(有关详细信息,请参阅上面的文档).
如果您知道迭代器如何工作以及如何使用迭代器,则不需要任何额外的库,因为自己构建自己的并发变得非常容易。让我示范一下:
/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()
// loop over all items with for..of
for (const x of iterator) {
console.log('x:', x)
// notices how this loop continues the same iterator
// and consumes the rest of the iterator, making the
// outer loop not logging any more x's
for (const y of iterator) {
console.log('y:', y)
}
}Run Code Online (Sandbox Code Playgroud)
我们可以使用相同的迭代器,并在工作人员之间共享它。
如果您使用.entries()的.values()是2D [index, value]并发,则将获得一个2D数组,我将在下面演示
const sleep = n => new Promise(rs => setTimeout(rs,n))
async function doWork(iterator) {
for (let [index, item] of iterator) {
await sleep(1000)
console.log(index + ': ' + item)
}
}
const arr = Array.from('abcdefghij')
const workers = new Array(2).fill(arr.entries()).map(doWork)
// ^--- starts two workers sharing the same iterator
Promise.all(workers).then(() => console.log('done'))Run Code Online (Sandbox Code Playgroud)
注意:与示例async-pool相比,它的不同之处在于它产生了两个工作程序,因此,如果一个工作程序由于某种原因在索引5上抛出错误,它不会阻止其他工作程序执行其余工作。因此,您从进行2次并发下降到1次(这样就不会在那里停止了)。因此,要知道所有工作人员何时完成将变得更加困难,因为Promise.all如果一次失败,它将提早保释。所以我的建议是您要捕获doWork函数内部的所有错误
| 归档时间: |
|
| 查看次数: |
29865 次 |
| 最近记录: |