异步并行请求按顺序运行

oli*_*rsm 4 javascript parallel-processing asynchronous node.js

我正在使用Node.js运行服务器,需要从我正在运行的另一台服务器请求数据(localhost:3001).我需要向数据服务器发出许多请求(~200)并收集数据(响应大小从~20Kb到~20Mb不等).每个请求都是独立的,我想将响应保存为表单的一个巨大数组:

[{"urlAAA": responseAAA}, {"urlCCC": responseCCC}, {"urlBBB": responseBBB}, etc ]
Run Code Online (Sandbox Code Playgroud)

请注意,项目的顺序并不重要,理想情况下,它们应按数据可用的顺序填充数组.

var express = require('express');
var router = express.Router();
var async = require("async");
var papa = require("papaparse");
var sync_request = require('sync-request');
var request = require("request");

var pinnacle_data = {};
var lookup_list = [];
for (var i = 0; i < 20; i++) {
    lookup_list.push(i);
}

function write_delayed_files(object, key, value) {
    object[key] = value;
    return;
}

var show_file = function (file_number) {
    var file_index = Math.round(Math.random() * 495) + 1;
    var pinnacle_file_index = 'http://localhost:3001/generate?file=' + file_index.toString();
    var response_json = sync_request('GET', pinnacle_file_index);
    var pinnacle_json = JSON.parse(response_json.getBody('utf8'));
    var object_key = "file_" + file_number.toString();
    pinnacle_data[object_key] = pinnacle_json;
    console.log("We've handled file:    " + file_number);
    return;
};

async.each(lookup_list, show_file, function (err) {});



console.log(pinnacle_data);

/* GET contact us page. */
router.get('/', function (req, res, next) {
    res.render('predictionsWtaLinks', {title: 'Async Trial'});
});

module.exports = router;
Run Code Online (Sandbox Code Playgroud)

现在,当该程序运行时,它显示:

We've handled file:    0
We've handled file:    1
We've handled file:    2
We've handled file:    3
We've handled file:    4
We've handled file:    5
etc
Run Code Online (Sandbox Code Playgroud)

现在因为文件具有这样的可变大小,我期望这将"并行"执行请求,但它似乎顺序执行它们,这是我试图通过使用避免的async.each().目前连接数据服务器大约需要1-2秒,因此在许多文件上执行此操作需要太长时间.

我意识到我正在使用同步请求,所以想要理想地替换:

var response_json = sync_request('GET', pinnacle_file_index);
Run Code Online (Sandbox Code Playgroud)

与类似的东西

request(pinnacle_file_index, function (error, response, body) {
    if (!error && response.statusCode == 200) {
        pinnacle_data[object_key] = JSON.parse(body);
    }
});
Run Code Online (Sandbox Code Playgroud)

任何帮助将非常感激.

另外我看过尝试:

  • 将url列表转换为匿名函数列表并使用async.parallel(function_list, function (err, results) { //add results to pinnacle_data[]});.(我在尝试为数组中的每个元素定义唯一函数时遇到了问题).

同样,我看了其他相关主题:

编辑 - 工作解决方案


以下代码现在执行任务(每个请求需要大约80ms,包括必须使用重复请求npm requestretry).类似地,这可以很好地扩展,平均请求时间为~80ms,总共在5个请求之间,最多1000个.

var performance = require("performance-now");
var time_start = performance();
var async = require("async");
var request_retry = require('requestretry');

var lookup_list = [];
var total_requests = 50;
for (var i = 0; i < total_requests; i++) {
    lookup_list.push(i);
}

var pinnacle_data = {};
async.map(lookup_list, function (item, callback) {
        var file_index = Math.round(Math.random() * 495) + 1;
        var pinnacle_file_index = 'http://localhost:3001/generate?file=' + file_index;
        request_retry({
                url: pinnacle_file_index,
                maxAttempts: 20,
                retryDelay: 20,
                retryStrategy: request_retry.RetryStrategies.HTTPOrNetworkError
            },
            function (error, response, body) {
                if (!error && response.statusCode == 200) {
                    body = JSON.parse(body);
                    var data_array = {};
                    data_array[file_index.toString()] = body;
                    callback(null, data_array);
                } else {
                    console.log(error);
                    callback(error || response.statusCode);
                }
            });
    },
    function (err, results) {
        var time_finish = performance();
        console.log("It took " + (time_finish - time_start).toFixed(3) + "ms to complete " + total_requests + " requests.");
        console.log("This gives an average rate of " + ((time_finish - time_start) / total_requests).toFixed(3) + " ms/request");
        if (!err) {
            for (var i = 0; i < results.length; i++) {
                for (key in results[i]) {
                    pinnacle_data[key] = results[i][key];
                }
            }
            var length_array = Object.keys(pinnacle_data).length.toString();
            console.log("We've got all the data, totalling " + length_array + " unique entries.");
        } else {
            console.log("We had an error somewhere.");
        }
    });
Run Code Online (Sandbox Code Playgroud)

谢谢您的帮助.

jfr*_*d00 5

正如您所发现的,async.parallel()只能并行化本身异步的操作.如果操作是同步的,那么由于node.js的单线程特性,操作将一个接一个地运行,而不是并行运行.但是,如果操作本身是异步的,那么async.parallel()(或其他异步方法)将立即启动它们并为您协调结果.

这是一个使用的一般概念async.map().我async.map()之所以使用,是因为它需要一个数组作为输入,并以与原始数据相同的顺序生成一个结果数组,但是并行运行所有请求,这似乎符合您的要求:

var async = require("async");
var request = require("request");

// create list of URLs
var lookup_list = [];
for (var i = 0; i < 20; i++) {
    var index = Math.round(Math.random() * 495) + 1;
    var url = 'http://localhost:3001/generate?file=' + index;
    lookup_list.push(url);
}

async.map(lookup_list, function(url, callback) {
    // iterator function
    request(url, function (error, response, body) {
        if (!error && response.statusCode == 200) {
            var body = JSON.parse(body);
            // do any further processing of the data here
            callback(null, body);
        } else {
            callback(error || response.statusCode);
        }
    });
}, function(err, results) {
    // completion function
    if (!err) {
        // process all results in the array here
        console.log(results);
        for (var i = 0; i < results.length; i++) {
            // do something with results[i]
        }
    } else {
        // handle error here
    }
});
Run Code Online (Sandbox Code Playgroud)

而且,这是一个使用Bluebird promises的版本,并且有点类似地使用Promise.map()迭代初始数组:

var Promise = require("bluebird");
var request = Promise.promisifyAll(require("request"), {multiArgs: true});

// create list of URLs
var lookup_list = [];
for (var i = 0; i < 20; i++) {
    var index = Math.round(Math.random() * 495) + 1;
    var url = 'http://localhost:3001/generate?file=' + index;
    lookup_list.push(url);
}

Promise.map(lookup_list, function(url) {
    return request.getAsync(url).spread(function(response, body) {
        if response.statusCode !== 200) {
            throw response.statusCode;
        }
        return JSON.parse(body);
    });
}).then(function(results) {
    console.log(results);
    for (var i = 0; i < results.length; i++) {
        // process results[i] here
    }
}, function(err) {
    // process error here
});
Run Code Online (Sandbox Code Playgroud)