我正在尝试编写一个小节点应用程序,它将搜索并解析文件系统上的大量文件.为了加快搜索速度,我们尝试使用某种map reduce.该计划将是以下简化方案:
我对此的问题是: 这在Node中是否可行?这样做的推荐方法是什么?
我一直在摆弄,但是请不要再使用Process了解示例:
发起者:
function Worker() { return child_process.fork("myProcess.js); }
for(var i = 0; i < require('os').cpus().length; i++){
var process = new Worker();
process.send(workItems.slice(i * itemsPerProcess, (i+1) * itemsPerProcess));
}
Run Code Online (Sandbox Code Playgroud)
myProcess.js
process.on('message', function(msg) {
var valuesToReturn = [];
// Do file reading here
//How would I return valuesToReturn?
process.exit(0);
}
Run Code Online (Sandbox Code Playgroud)
几个旁注:
rob*_*lep 17
应该可行.举个简单的例子:
// parent.js
var child_process = require('child_process');
var numchild = require('os').cpus().length;
var done = 0;
for (var i = 0; i < numchild; i++){
var child = child_process.fork('./child');
child.send((i + 1) * 1000);
child.on('message', function(message) {
console.log('[parent] received message from child:', message);
done++;
if (done === numchild) {
console.log('[parent] received all results');
...
}
});
}
// child.js
process.on('message', function(message) {
console.log('[child] received message from server:', message);
setTimeout(function() {
process.send({
child : process.pid,
result : message + 1
});
process.disconnect();
}, (0.5 + Math.random()) * 5000);
});
Run Code Online (Sandbox Code Playgroud)
因此父进程生成X个子进程并向它们传递消息.它还安装了一个事件处理程序来监听从子进程发回的任何消息(例如,结果).
子进程等待来自父进程的消息,并开始处理(在这种情况下,它只是启动一个带有随机超时的计时器来模拟正在完成的一些工作).完成后,它会将结果发送回父进程,并使用它与父进程process.disconnect()断开连接(基本上停止子进程).
父进程会跟踪已启动的子进程数以及已发回结果的子进程数.当这些数字相等时,父级接收子进程的所有结果,以便它可以组合所有结果并返回JSON结果.
对于像这样的分布式问题,我使用了 zmq 并且效果很好。我会给你一个我遇到的类似问题,并试图通过进程解决(但失败了。)然后转向 zmq。
使用 bcrypt 或昂贵的散列算法是明智的,但它会阻止节点进程大约 0.5 秒。我们不得不将其卸载到不同的服务器,作为快速修复,我基本上完全使用了您所做的。运行子进程并向其发送消息并使其响应。我们发现的唯一问题是无论出于何种原因,我们的子进程在完全不工作时会固定整个核心。(我仍然没有弄清楚为什么会发生这种情况,我们进行了跟踪,似乎 epoll 在 stdout 上失败了/stdin 流。它也只会发生在我们的 Linux 机器上,并且在 OSX 上也能正常工作。)
编辑:
核心的固定已在https://github.com/joyent/libuv/commit/12210fe中修复并且与https://github.com/joyent/node/issues/5504相关,因此如果您遇到问题并且您正在使用 centos + kernel v2.6.32:更新节点,或更新您的内核!
不管我对 child_process.fork() 有什么问题,这是我一直使用的一个漂亮的模式
客户:
var child_process = require('child_process');
function FileParser() {
this.__callbackById = [];
this.__callbackIdIncrement = 0;
this.__process = child_process.fork('./child');
this.__process.on('message', this.handleMessage.bind(this));
}
FileParser.prototype.handleMessage = function handleMessage(message) {
var error = message.error;
var result = message.result;
var callbackId = message.callbackId;
var callback = this.__callbackById[callbackId];
if (! callback) {
return;
}
callback(error, result);
delete this.__callbackById[callbackId];
};
FileParser.prototype.parse = function parse(data, callback) {
this.__callbackIdIncrement = (this.__callbackIdIncrement + 1) % 10000000;
this.__callbackById[this.__callbackIdIncrement] = callback;
this.__process.send({
data: data, // optionally you could pass in the path of the file, and open it in the child process.
callbackId: this.__callbackIdIncrement
});
};
module.exports = FileParser;
Run Code Online (Sandbox Code Playgroud)
子进程:
process.on('message', function(message) {
var callbackId = message.callbackId;
var data = message.data;
function respond(error, response) {
process.send({
callbackId: callbackId,
error: error,
result: response
});
}
// parse data..
respond(undefined, "computed data");
});
Run Code Online (Sandbox Code Playgroud)
我们还需要一个模式来同步不同的进程,当每个进程完成它的任务时,它会回应我们,我们会为每个完成的进程增加一个计数,然后当我们命中时调用信号量的回调我们想要的计数。
function Semaphore(wait, callback) {
this.callback = callback;
this.wait = wait;
this.counted = 0;
}
Semaphore.prototype.signal = function signal() {
this.counted++;
if (this.counted >= this.wait) {
this.callback();
}
}
module.exports = Semaphore;
Run Code Online (Sandbox Code Playgroud)
这是一个将上述所有模式联系在一起的用例:
var FileParser = require('./FileParser');
var Semaphore = require('./Semaphore');
var arrFileParsers = [];
for(var i = 0; i < require('os').cpus().length; i++){
var fileParser = new FileParser();
arrFileParsers.push(fileParser);
}
function getFiles() {
return ["file", "file"];
}
var arrResults = [];
function onAllFilesParsed() {
console.log('all results completed', JSON.stringify(arrResults));
}
var lock = new Semaphore(arrFileParsers.length, onAllFilesParsed);
arrFileParsers.forEach(function(fileParser) {
var arrFiles = getFiles(); // you need to decide how to split the files into 1k chunks
fileParser.parse(arrFiles, function (error, result) {
arrResults.push(result);
lock.signal();
});
});
Run Code Online (Sandbox Code Playgroud)
最终我使用了http://zguide.zeromq.org/page:all#The-Load-Balancing-Pattern,其中客户端使用的是 nodejs zmq 客户端,而 worker/broker 是用 C 编写的。这允许我们进行扩展这跨多台机器,而不仅仅是具有子进程的本地机器。
| 归档时间: |
|
| 查看次数: |
17063 次 |
| 最近记录: |