mup*_*n82 2 javascript node.js
我的程序正在与一个只接受每秒约10个请求的Web服务进行通信.我的程序不时向Web服务发送100多个并发请求,导致程序崩溃.
如何将Node.js中的并发请求限制为每秒5个?我正在使用请求库.
// IF EVENT AND SENDER
if(data.sender[0].events && data.sender[0].events.length > 0) {
// FIND ALL EVENTS
for(var i = 0; i < data.sender[0].events.length; i++) {
// IF TYPE IS "ADDED"
if(data.sender[0].events[i].type == "added") {
switch (data.sender[0].events[i].link.rel) {
case "contact" :
batch("added", data.sender[0].events[i].link.href);
//_initContacts(data.sender[0].events[i].link.href);
break;
}
// IF TYPE IS "UPDATED"
} else if(data.sender[0].events[i].type == "updated") {
switch (data.sender[0].events[i].link.rel){
case "contactPresence" :
batch("updated", data.sender[0].events[i].link.href);
//_getContactPresence(data.sender[0].events[i].link.href);
break;
case "contactNote" :
batch("updated", data.sender[0].events[i].link.href);
// _getContactNote(data.sender[0].events[i].link.href);
break;
case "contactLocation" :
batch("updated", data.sender[0].events[i].link.href);
// _getContactLocation(data.sender[0].events[i].link.href);
break;
case "presenceSubscription" :
batch("updated", data.sender[0].events[i].link.href);
// _extendPresenceSubscription(data.sender[0].events[i].link.href);
break;
}
}
};
Run Code Online (Sandbox Code Playgroud)
然后是本土的批处理方法:
var updated = [];
var added = [];
var batch = function(type, url){
console.log("batch called");
if (type === "added"){
console.log("Added batched");
added.push(url);
if (added.length > 5) {
setTimeout(added.forEach(function(req){
_initContacts(req);
}), 2000);
added = [];
}
}
else if (type === "updated"){
console.log("Updated batched");
updated.push(url);
console.log("Updated length is : ", updated.length);
if (updated.length > 5){
console.log("Over 5 updated events");
updated.forEach(function(req){
setTimeout(_getContactLocation(req), 2000);
});
updated = [];
}
}
};
Run Code Online (Sandbox Code Playgroud)
以及实际请求的示例:
var _getContactLocation = function(url){
r.get(baseUrl + url,
{ "strictSSL" : false, "headers" : { "Authorization" : "Bearer " + accessToken }},
function(err, res, body){
if(err)
console.log(err);
else {
var data = JSON.parse(body);
self.emit("data.contact", data);
}
}
);
};
Run Code Online (Sandbox Code Playgroud)
使用异步库,该mapLimit功能完全符合您的要求.由于您未提供任何代码,因此无法为您的特定用例提供示例.
从自述文件:
与map相同的只是"limit"迭代器将在任何时候同时运行.
请注意,这些项目不是批量处理的,因此无法保证第一个"限制"迭代器函数在启动任何其他函数之前完成.
参数
例
async.mapLimit(['file1','file2','file3'], 1, fs.stat, function(err, results){
// results is now an array of stats for each file
});
编辑:既然你提供了代码,我看到你的使用与我的假设有点不同.async当您知道要预先运行的所有任务时,库会更有用.我不知道有一个库可以轻松解决这个问题.上述说明可能仍然与搜索此主题的人员相关,因此我将其保留.
抱歉,我没有时间重构您的代码,但这是一个(未经测试的)函数示例,该函数发出异步请求,同时自我限制为每秒5个请求.我强烈建议您使用此方法来提供适合您代码库的更通用的解决方案.
var throttledRequest = (function () {
var queue = [], running = 0;
function sendPossibleRequests() {
var url;
while (queue.length > 0 && running < 5) {
url = queue.shift();
running++;
r.get(url, { /* YOUR OPTIONS HERE*/ }, function (err, res, body) {
running--;
sendPossibleRequests();
if(err)
console.log(err);
else {
var data = JSON.parse(body);
self.emit("data.contact", data);
}
});
}
}
return function (url) {
queue.push(url);
sendPossibleRequests();
};
})();
Run Code Online (Sandbox Code Playgroud)
基本上,您保留所有数据的队列以进行异步处理(例如要请求的URL),然后在每次回调后(从请求中),您尝试启动尽可能多的剩余请求.
这正是节点Agent类旨在解决的问题.你做过傻事require('http').globalAgent.maxSockets = Number.MAX_VALUE或agent: false作为请求选项传递的东西吗?
使用Node的默认行为,您的程序一次不会发送超过5个并发请求.此外,代理还提供了简单队列无法实现的优化(即HTTP keepalive).
如果您尝试发出许多请求(例如,从循环发出100个请求),前5个将开始,代理将排队剩余的95个.当请求完成时,它将启动下一个请求.
您可能想要做的是Agent为您的Web服务请求创建一个,并将其传递给每个请求的调用(而不是将请求与全局代理混合).
var http=require('http'), svcAgent = http.Agent();
request({ ... , agent: svcAgent });
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8525 次 |
| 最近记录: |