Socket.io,集群,表达和同步事件

thr*_*n19 9 cluster-computing node.js express socket.io

我有一个大问题的一个星期.我尝试转换我的node.JS项目实际上在单核上运行到多核与集群.

使用websockets,此时,我对事件没有任何问题,但是,对于xhr-polling或jsonp-polling,我在集群模式下遇到socket.io的大问题.

这是我的服务器配置:

00 generic.js

'use strict';

var http            = require('http'),
    os              = require('os'),
    cluster         = require('cluster');

module.exports = function(done) {
    var app = this.express,
        port = process.env.PORT || 3000,
        address = '0.0.0.0';

    if(this.env == 'test'){
        port = 3030;
    }

    var self = this;
    var size = os.cpus().length;

    if (cluster.isMaster) {
        console.info('Creating HTTP server cluster with %d workers', size);

        for (var i = 0; i < size; ++i) {
            console.log('spawning worker process %d', (i + 1));
            cluster.fork();
        }

        cluster.on('fork', function(worker) {
            console.log('worker %s spawned', worker.id);
        });
        cluster.on('online', function(worker) {
            console.log('worker %s online', worker.id);
        });
        cluster.on('listening', function(worker, addr) {
            console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port);
        });
        cluster.on('disconnect', function(worker) {
            console.log('worker %s disconnected', worker.id);
        });
        cluster.on('exit', function(worker, code, signal) {
            console.log('worker %s died (%s)', worker.id, signal || code);
            if (!worker.suicide) {
                console.log('restarting worker');
                cluster.fork();
            }
        });
    } else {
        http.createServer(app).listen(port, address, function() {
            var addr = this.address();
            console.log('listening on %s:%d', addr.address, addr.port);
            self.server = this;
            done();
        });
    }
};
Run Code Online (Sandbox Code Playgroud)

03-socket.io.js

"use strict";
var _               = require('underscore'),
    socketio        = require('socket.io'),
    locomotive      = require('locomotive'),
    RedisStore      = require("socket.io/lib/stores/redis"),
    redis           = require("socket.io/node_modules/redis"),
    v1              = require(__dirname + '/../app/socket.io/v1'),
    sockets         = require(__dirname + '/../../app/socket/socket'),
    config          = require(__dirname + '/../app/global'),
    cluster         = require('cluster');

module.exports = function () {
    if (!cluster.isMaster) {
        this.io = socketio.listen(this.server);

        var pub             = redis.createClient(),
            sub             = redis.createClient(),
            client          = redis.createClient();

        this.io.enable('browser client minification');  // send minified client
        this.io.enable('browser client etag');          // apply etag caching logic based on version number
        this.io.enable('browser client gzip');          // gzip the file

        this.io.set("store", new RedisStore({
            redisPub        : pub,
            redisSub        : sub,
            redisClient     : client
        }));
        this.io.set('log level', 2);
        this.io.set('transports', [
            'websocket',
            'jsonp-polling'
        ]);
        this.io.set('close timeout', 24*60*60);
        this.io.set('heartbeat timeout', 24*60*60);

        this.io.sockets.on('connection', function (socket) {
            console.log('connected with ' + this.io.transports[socket.id].name);

            // partie v1 @deprecated
            v1.events(socket);

            // partie v1.1 refaite
            _.each(sockets['1.1'], function(Mod) {
                var mod = new Mod();
                mod.launch({
                    socket  : socket,
                    io      : this.io
                });
            }, this);

        }.bind(this));
    }
};
Run Code Online (Sandbox Code Playgroud)

通过轮询,客户端会不时地在与启动的侦听器不同的进程上进行连接.同样,通信服务器向客户端发射.

通过一点搜索,我发现有必要通过store.io的商店来共享数据连接.所以我构建了RedisStore socket.io,如文档中所示,但即便如此,我发现自己的事件没有安全到达,我仍然收到此错误消息:

warn: client not handshaken client should reconnect
Run Code Online (Sandbox Code Playgroud)

编辑

现在,没有调用警告错误.我将redisStore更改为socket.io-clusterhub但是现在,并不总是调用事件.有时,好像轮询请求被另一个工作者捕获,而不是开始听众,因此没有任何反应.这是新配置:

'use strict';

var http            = require('http'),
    locomotive      = require('locomotive'),
    os              = require('os'),
    cluster         = require('cluster'),
    config          = require(__dirname + '/../app/global'),
    _               = require('underscore'),
    socketio        = require('socket.io'),
    v1              = require(__dirname + '/../app/socket.io/v1'),
    sockets         = require(__dirname + '/../../app/socket/socket');

module.exports = function(done) {
    var app = this.express,
        port = process.env.PORT || 3000,
        address = '0.0.0.0';

    if(this.env == 'test'){
        port = 3030;
    }

    var self = this;
    var size = os.cpus().length;

    this.clusterStore = new (require('socket.io-clusterhub'));

    if (cluster.isMaster) {
        for (var i = 0; i < size; ++i) {
            console.log('spawning worker process %d', (i + 1));
            cluster.fork();
        }

        cluster.on('fork', function(worker) {
            console.log('worker %s spawned', worker.id);
        });
        cluster.on('online', function(worker) {
            console.log('worker %s online', worker.id);
        });
        cluster.on('listening', function(worker, addr) {
            console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port);
        });
        cluster.on('disconnect', function(worker) {
            console.log('worker %s disconnected', worker.id);
        });
        cluster.on('exit', function(worker, code, signal) {
            console.log('worker %s died (%s)', worker.id, signal || code);
            if (!worker.suicide) {
                console.log('restarting worker');
                cluster.fork();
            }
        });
    } else {
        var server = http.createServer(app);

        this.io = socketio.listen(server);

        this.io.configure(function() {
            this.io.enable('browser client minification');  // send minified client
            this.io.enable('browser client etag');          // apply etag caching logic based on version number
            this.io.enable('browser client gzip');          // gzip the file

            this.io.set('store', this.clusterStore);
            this.io.set('log level', 2);
            this.io.set('transports', [
                'websocket',
                'jsonp-polling'
            ]);
            //this.io.set('close timeout', 24*60*60);
            //this.io.set('heartbeat timeout', 24*60*60);
        }.bind(this));

        this.io.sockets.on('connection', function (socket) {
            console.log('connected with ' + this.io.transports[socket.id].name);
            console.log('connected to worker: ' + cluster.worker.id);

            // partie v1 @deprecated
            v1.events(socket);

            // partie v1.1 refaite
            _.each(sockets['1.1'], function(Mod) {
                var mod = new Mod();
                mod.launch({
                    socket  : socket,
                    io      : this.io
                });
            }, this);

        }.bind(this));

        server.listen(port, address, function() {
            var addr = this.address();
            console.log('listening on %s:%d', addr.address, addr.port);
            self.server = this;
            done();
        });
    }
};
Run Code Online (Sandbox Code Playgroud)

Lud*_*c C 2

来自该来源: http: //socket.io/docs/using-multiple-nodes/

\n\n
\n

如果您计划在不同的进程或计算机之间分配连接负载,则必须确保与特定会话 ID 关联的请求连接到发起它们的进程。

\n\n

这是由于 XHR 轮询或 JSONP 轮询等某些传输依赖于在\n \xe2\x80\x9csocket\xe2\x80\x9d 的生命周期内触发多个请求。

\n
\n\n

每次将连接路由到同一个工作人员:

\n\n

粘性会话

\n\n

在 socket.io 文档中,这是每次将请求路由到同一工作线程的推荐方法。

\n\n

https://github.com/indutny/sticky-session

\n\n
\n

一种在集群中使用 socket.io 的简单高效方法。

\n\n

Socket.io 正在执行多个请求来执行握手并\n 与客户端建立连接。对于集群,这些请求可能会到达不同的工作线程,这将破坏握手协议。

\n
\n\n
var sticky = require(\'sticky-sesion\');\n\nsticky(function() {\n  // This code will be executed only in slave workers\n\n  var http = require(\'http\'),\n      io = require(\'socket.io\');\n\n  var server = http.createServer(function(req, res) {\n    // ....\n  });\n  io.listen(server);\n\n  return server;\n}).listen(3000, function() {\n  console.log(\'server started on 3000 port\');\n});\n
Run Code Online (Sandbox Code Playgroud)\n\n

在节点之间传递消息:

\n\n

socket.io-redis

\n\n

在 socket.io 文档中,这是在工作人员之间共享消息的推荐方法。

\n\n

https://github.com/automattic/socket.io-redis

\n\n
\n

通过使用socket.io-redis适配器运行socket.io,您可以在不同的进程或服务器中运行多个socket.io实例,这些实例都可以相互广播和发出事件。

\n
\n\n

socket.io-redis 的使用方式如下:

\n\n
var io = require(\'socket.io\')(3000);\nvar redis = require(\'socket.io-redis\');\nio.adapter(redis({ host: \'localhost\', port: 6379 }));\n
Run Code Online (Sandbox Code Playgroud)\n\n

\n\n

我认为您没有使用 socket.io v1.0.0。您可能需要更新版本以获得更高的稳定性。

\n\n

您可以在http://socket.io/docs/migration-from-0-9/查看他们的迁移指南

\n

  • @ludo - 请使用粘性会话和快递的工作版本更新您的答案。我还没有找到一个有效的例子:https://github.com/indutny/sticky-session/issues/7 (2认同)