在Node.js中执行并行处理的最佳方法

Bar*_*den 20 node.js

我正在尝试编写一个小节点应用程序,它将搜索并解析文件系统上的大量文件.为了加快搜索速度,我们尝试使用某种map reduce.该计划将是以下简化方案:

  • Web请求带有搜索查询
  • 启动3个进程,每个进程分配1000个(不同的)文件
  • 一旦一个进程完成,它就会将它的结果"返回"回主线程
  • 一旦所有进程完成,主线程将继续返回组合结果作为JSON结果

我对此的问题是: 这在Node中是否可行?这样做的推荐方法是什么?

我一直在摆弄,但是请不要再使用Process了解示例:

发起者:

function Worker() { return child_process.fork("myProcess.js); }
for(var i = 0; i < require('os').cpus().length; i++){
        var process = new Worker();
        process.send(workItems.slice(i * itemsPerProcess, (i+1) * itemsPerProcess));
}
Run Code Online (Sandbox Code Playgroud)

myProcess.js

process.on('message', function(msg) {
    var valuesToReturn = [];
    // Do file reading here
    //How would I return valuesToReturn?
    process.exit(0);
}
Run Code Online (Sandbox Code Playgroud)

几个旁注:

  • 我知道进程数应该取决于服务器上CPU的数量
  • 我也知道文件系统中的速度限制.在我们将其移动到数据库或Lucene实例之前,请将其视为概念验证:-)

rob*_*lep 17

应该可行.举个简单的例子:

// parent.js
var child_process = require('child_process');

var numchild  = require('os').cpus().length;
var done      = 0;

for (var i = 0; i < numchild; i++){
  var child = child_process.fork('./child');
  child.send((i + 1) * 1000);
  child.on('message', function(message) {
    console.log('[parent] received message from child:', message);
    done++;
    if (done === numchild) {
      console.log('[parent] received all results');
      ...
    }
  });
}

// child.js
process.on('message', function(message) {
  console.log('[child] received message from server:', message);
  setTimeout(function() {
    process.send({
      child   : process.pid,
      result  : message + 1
    });
    process.disconnect();
  }, (0.5 + Math.random()) * 5000);
});
Run Code Online (Sandbox Code Playgroud)

因此父进程生成X个子进程并向它们传递消息.它还安装了一个事件处理程序来监听从子进程发回的任何消息(例如,结果).

子进程等待来自父进程的消息,并开始处理(在这种情况下,它只是启动一个带有随机超时的计时器来模拟正在完成的一些工作).完成后,它会将结果发送回父进程,并使用它与父进程process.disconnect()断开连接(基本上停止子进程).

父进程会跟踪已启动的子进程数以及已发回结果的子进程数.当这些数字相等时,父级接收子进程的所有结果,以便它可以组合所有结果并返回JSON结果.


Roo*_*ool 7

对于像这样的分布式问题,我使用了 zmq 并且效果很好。我会给你一个我遇到的类似问题,并试图通过进程解决(但失败了。)然后转向 zmq。

使用 bcrypt 或昂贵的散列算法是明智的,但它会阻止节点进程大约 0.5 秒。我们不得不将其卸载到不同的服务器,作为快速修复,我基本上完全使用了您所做的。运行子进程并向其发送消息并使其响应。我们发现的唯一问题是无论出于何种原因,我们的子进程在完全不工作时会固定整个核心。(我仍然没有弄清楚为什么会发生这种情况,我们进行了跟踪,似乎 epoll 在 stdout 上失败了/stdin 流。它也只会发生在我们的 Linux 机器上,并且在 OSX 上也能正常工作。)

编辑:

核心的固定已在https://github.com/joyent/libuv/commit/12210fe中修复并且与https://github.com/joyent/node/issues/5504相关,因此如果您遇到问题并且您正在使用 centos + kernel v2.6.32:更新节点,或更新您的内核!

不管我对 child_process.fork() 有什么问题,这是我一直使用的一个漂亮的模式

客户:

var child_process = require('child_process');

function FileParser() {

    this.__callbackById = [];
    this.__callbackIdIncrement = 0;
    this.__process = child_process.fork('./child');
    this.__process.on('message', this.handleMessage.bind(this));

}

FileParser.prototype.handleMessage = function handleMessage(message) {

    var error = message.error;
    var result = message.result;
    var callbackId = message.callbackId;
    var callback = this.__callbackById[callbackId];

    if (! callback) {
        return;
    }
    callback(error, result);
    delete this.__callbackById[callbackId];

};

FileParser.prototype.parse = function parse(data, callback) {

    this.__callbackIdIncrement = (this.__callbackIdIncrement + 1) % 10000000;
    this.__callbackById[this.__callbackIdIncrement] = callback;
    this.__process.send({
        data: data, // optionally you could pass in the path of the file, and open it in the child process.
        callbackId: this.__callbackIdIncrement
    });

};

module.exports = FileParser;
Run Code Online (Sandbox Code Playgroud)

子进程:

process.on('message', function(message) {

    var callbackId = message.callbackId;
    var data = message.data;
    function respond(error, response) {
        process.send({
            callbackId: callbackId,
            error: error,
            result: response
        });
    }
    // parse data..
    respond(undefined, "computed data");
});
Run Code Online (Sandbox Code Playgroud)

我们还需要一个模式来同步不同的进程,当每个进程完成它的任务时,它会回应我们,我们会为每个完成的进程增加一个计数,然后当我们命中时调用信号量的回调我们想要的计数。

function Semaphore(wait, callback) {

    this.callback = callback;
    this.wait = wait;
    this.counted = 0;

}

Semaphore.prototype.signal = function signal() {
    this.counted++;
    if (this.counted >= this.wait) {
        this.callback();
    }
}

module.exports = Semaphore;
Run Code Online (Sandbox Code Playgroud)

这是一个将上述所有模式联系在一起的用例:

var FileParser = require('./FileParser');
var Semaphore  = require('./Semaphore');

var arrFileParsers = [];
for(var i = 0; i < require('os').cpus().length; i++){
    var fileParser = new FileParser();
    arrFileParsers.push(fileParser);
}

function getFiles() {
    return ["file", "file"];
}

var arrResults = [];
function onAllFilesParsed() {
    console.log('all results completed', JSON.stringify(arrResults));
}

var lock = new Semaphore(arrFileParsers.length, onAllFilesParsed);
arrFileParsers.forEach(function(fileParser) {
    var arrFiles  = getFiles(); // you need to decide how to split the files into 1k chunks
    fileParser.parse(arrFiles, function (error, result) {
        arrResults.push(result);
        lock.signal();
    });
});
Run Code Online (Sandbox Code Playgroud)

最终我使用了http://zguide.zeromq.org/page:all#The-Load-Balancing-Pattern,其中客户端使用的是 nodejs zmq 客户端,而 worker/broker 是用 C 编写的。这允许我们进行扩展这跨多台机器,而不仅仅是具有子进程的本地机器。