Node.js中的批处理请求

mup*_*n82 2 javascript node.js

我的程序正在与一个只接受每秒约10个请求的Web服务进行通信.我的程序不时向Web服务发送100多个并发请求,导致程序崩溃.

如何将Node.js中的并发请求限制为每秒5个?我正在使用请求库.

 // IF EVENT AND SENDER
    if(data.sender[0].events && data.sender[0].events.length > 0) {


        // FIND ALL EVENTS
        for(var i = 0; i < data.sender[0].events.length; i++) {

            // IF TYPE IS "ADDED"
            if(data.sender[0].events[i].type == "added") {
                switch (data.sender[0].events[i].link.rel) {
                    case "contact" :
                        batch("added", data.sender[0].events[i].link.href);
                        //_initContacts(data.sender[0].events[i].link.href);
                        break;
                } 
            // IF TYPE IS "UPDATED"
            } else if(data.sender[0].events[i].type == "updated") {

                switch (data.sender[0].events[i].link.rel){                     
                    case "contactPresence" :
                        batch("updated", data.sender[0].events[i].link.href);
                        //_getContactPresence(data.sender[0].events[i].link.href);
                        break;
                    case "contactNote" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactNote(data.sender[0].events[i].link.href);
                        break;
                    case "contactLocation" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactLocation(data.sender[0].events[i].link.href);
                        break;
                    case "presenceSubscription" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _extendPresenceSubscription(data.sender[0].events[i].link.href);
                        break;
                }
            }
        };
Run Code Online (Sandbox Code Playgroud)

然后是本土的批处理方法:

var updated = [];
var added = [];

var batch = function(type, url){
    console.log("batch called");


    if (type === "added"){
        console.log("Added batched");
        added.push(url);
        if (added.length > 5) {
            setTimeout(added.forEach(function(req){
                _initContacts(req);
            }), 2000);
            added = [];
        }
    } 
    else if (type === "updated"){
        console.log("Updated batched");
        updated.push(url);
        console.log("Updated length is : ", updated.length);
        if (updated.length > 5){
            console.log("Over 5 updated events");
            updated.forEach(function(req){
                setTimeout(_getContactLocation(req), 2000);
            });
            updated = [];
        }
    }       
};
Run Code Online (Sandbox Code Playgroud)

以及实际请求的示例:

var _getContactLocation = function(url){
    r.get(baseUrl + url, 
    { "strictSSL" : false, "headers" : { "Authorization" : "Bearer " + accessToken }}, 
        function(err, res, body){
            if(err)
                console.log(err);
            else {
                var data = JSON.parse(body);
                self.emit("data.contact", data);
            }
        }
    );
};
Run Code Online (Sandbox Code Playgroud)

Geo*_*rge 7

使用异步库,该mapLimit功能完全符合您的要求.由于您未提供任何代码,因此无法为您的特定用例提供示例.

从自述文件:


mapLimit(arr,limit,iterator,callback)

与map相同的只是"limit"迭代器将在任何时候同时运行.

请注意,这些项目不是批量处理的,因此无法保证第一个"限制"迭代器函数在启动任何其他函数之前完成.

参数

  • arr - 要迭代的数组.
  • limit - 随时运行的最大迭代器数.
  • iterator(item,callback) - 一个应用于数组中每个项的函数.迭代器传递一个回调(错误,转换),一旦完成错误(可以为null)和转换项,就必须调用它.
  • callback(err,results) - 在所有迭代器函数完成或发生错误后调用的回调.结果是来自原始数组的变换项的数组.

async.mapLimit(['file1','file2','file3'], 1, fs.stat, function(err, results){ // results is now an array of stats for each file });


编辑:既然你提供了代码,我看到你的使用与我的假设有点不同.async当您知道要预先运行的所有任务时,库会更有用.我不知道有一个库可以轻松解决这个问题.上述说明可能仍然与搜索此主题的人员相关,因此我将其保留.

抱歉,我没有时间重构您的代码,但这是一个(未经测试的)函数示例,该函数发出异步请求,同时自我限制为每秒5个请求.我强烈建议您使用此方法来提供适合您代码库的更通用的解决方案.

var throttledRequest = (function () {
    var queue = [], running = 0;

    function sendPossibleRequests() {
        var url;
        while (queue.length > 0 && running < 5) {
            url = queue.shift();
            running++;
            r.get(url, { /* YOUR OPTIONS HERE*/ }, function (err, res, body) {
                running--;
                sendPossibleRequests();

                if(err)
                    console.log(err);
                else {
                    var data = JSON.parse(body);
                    self.emit("data.contact", data);
                }
            });
        }
    }

    return function (url) {
        queue.push(url);
        sendPossibleRequests();
    };
})();
Run Code Online (Sandbox Code Playgroud)

基本上,您保留所有数据的队列以进行异步处理(例如要请求的URL),然后在每次回调后(从请求中),您尝试启动尽可能多的剩余请求.


jos*_*736 6

这正是节点Agent类旨在解决的问题.你做过傻事require('http').globalAgent.maxSockets = Number.MAX_VALUEagent: false作为请求选项传递的东西吗?

使用Node的默认行为,您的程序一次不会发送超过5个并发请求.此外,代理还提供了简单队列无法实现的优化(即HTTP keepalive).

如果您尝试发出许多请求(例如,从循环发出100个请求),前5个将开始,代理将排队剩余的95个.当请求完成时,它将启动下一个请求.

您可能想要做的是Agent为您的Web服务请求创建一个,并将其传递给每个请求的调用(而不是将请求与全局代理混合).

var http=require('http'), svcAgent = http.Agent();

request({ ... , agent: svcAgent });
Run Code Online (Sandbox Code Playgroud)