NodeJS |集群:如何从master向所有或单个子/ worker发送数据?

hto*_*nus 25 cluster-computing node.js

我有节点的工作(库存)脚本

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd && msg.cmd == 'notifyRequest') {
        numReqs++;
      }
    });
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}
Run Code Online (Sandbox Code Playgroud)

在上面的脚本中,我可以轻松地将数据从worker发送到master进程.但是如何将数据从master发送给worker/workers呢?有了例子,如果可能的话.

ale*_*lex 39

因为cluster.fork是在child_process.fork之上实现的,所以您可以通过使用worker.send({ msg: 'test' }),从工作者到主服务器,将消息从主服务器发送到工作服务器process.send({ msg: 'test' });.您收到如下消息:( worker.on('message', callback)从工人到主人)和process.on('message', callback);(从主人到工人).

这是我的完整示例,您可以通过浏览http:// localhost:8000 /来测试它然后工作人员将向主人发送消息,主人将回复:

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var worker;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    worker = cluster.fork();

    worker.on('message', function(msg) {
      // we only want to intercept messages that have a chat property
      if (msg.chat) {
        console.log('Worker to master: ', msg.chat);
        worker.send({ chat: 'Ok worker, Master got the message! Over and out!' });
      }
    });

  }
} else {
  process.on('message', function(msg) {
    // we only want to intercept messages that have a chat property
    if (msg.chat) {
      console.log('Master to worker: ', msg.chat);
    }
  });
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ chat: 'Hey master, I got a new request!' });
  }).listen(8000);
}
Run Code Online (Sandbox Code Playgroud)

  • 听起来不错,请确保在RedisStore中使用Socket.IO. (2认同)
  • 这行不通。`for`里面的`var`?`worker` 将持有最后一个分叉的工人,而不是每一个(特别是在事件回调中)。要么你不关心所有,你只是附上你的回调,要么你把所有的工人都放在 Array 中。 (2认同)

Kev*_*lly 8

我在寻找一种向所有子进程发送消息的方法时找到了这个线程,幸好能够通过对数组的注释来解决这个问题.只是想说明一种利用这种方法向所有子进程发送消息的潜在解决方案.

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var workers = [];

if (cluster.isMaster) {
  // Broadcast a message to all workers
  var broadcast = function() {
    for (var i in workers) {
      var worker = workers[i];
      worker.send({ cmd: 'broadcast', numReqs: numReqs });
    }
  }

  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd) {
        switch (msg.cmd) {
          case 'notifyRequest':
            numReqs++;
          break;
          case 'broadcast':
            broadcast();
          break;
        }
    });

    // Add the worker to an array of known workers
    workers.push(worker);
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // React to messages received from master
  process.on('message', function(msg) {
    switch(msg.cmd) {
      case 'broadcast':
        if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs);
      break;
    }
  });

  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
    process.send({ cmd: 'broadcast' });
  }).listen(8000);
}
Run Code Online (Sandbox Code Playgroud)


Liq*_*ony 7

这是我如何实施类似问题的解决方案。通过连接到cluster.on('fork'),您可以在分叉工人时将消息处理程序附加到他们(而不是将它们存储在数组中),这具有处理工人死亡或断开连接以及分叉新工人的情况的额外优势。

此代码段将从 master 向所有worker发送消息。

if (cluster.isMaster) {
    for (var i = 0; i < require('os').cpus.length; i++) {
        cluster.fork();
    }

    cluster.on('disconnect', function(worker) {
        cluster.fork();
    }

    // When a new worker process is forked, attach the handler
    // This handles cases where new worker processes are forked
    // on disconnect/exit, as above.
    cluster.on('fork', function(worker) {
        worker.on('message', messageRelay);
    }

    var messageRelay = function(msg) {
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send(msg);
        });
    };
}
else {
    process.on('message', messageHandler);

    var messageHandler = function messageHandler(msg) {
        // Worker received message--do something
    };
}
Run Code Online (Sandbox Code Playgroud)