Socket.io,Node.js和rethinkdb

Sar*_*hov 3 sockets node.js rethinkdb

我在Socket.io,Node.js和rethinkdb中使用简单的应用程序.在服务器中,我的server.js文件工作约20分钟.20分钟后我的代码停止了. 我的服务器.js:

var express = require('express'),
path = require('path'),
http = require('http'),
io = require('socket.io'),
r = require('rethinkdb');


var app = express();

app.configure(function() {
app.set('port', process.env.PORT || 3000);
app.use(express.logger('dev'));
app.use(express.bodyParser())
app.use(express.static(path.join(__dirname, 'public')));
});

var server = http.createServer(app);
io = io.listen(server);


server.listen(app.get('port'), function() {
console.log("Express server listening on port " + app.get('port'));
});

var count = 0;
var say = 0;
io.sockets.on('connection', function(socket) {

count++;
socket.on('message', function(message) {
    url = message;
    //socket.join(url);
    r.connect({
        host: 'localhost',
        port: 28015
    }, function(err, conn) {
        if (err) throw err;
        r.db('test').table('online').insert({
            socket_id: socket.id,
            news_id: message
        }).run(conn, function(err) {
            if (err) throw err;
            r.db('test').table("online").filter({
                news_id: message
            }).count().run(conn, function(err, res) {
                if (err) throw err;
                say = res;


                console.log("connect say" + say);
                // ip = socket.handshake.address.address;              
                io.sockets.emit('pageview', {
                    'say': say,
                    'count': count,
                    'news_id': message
                });

            });

        });




    });


});



socket.on('disconnect', function() {

    var news_id = 0;
    console.log("Socket disconnected:" + socket.id);

    r.connect({
        host: 'localhost',
        port: 28015
    }, function(err, conn) {

        if (err) throw err;

        r.db('test').table("online").filter({
            socket_id: socket.id
        })("news_id").run(conn).then(function(cursor) {


            cursor.each(function(err, item) {

                news_id = item;


                r.db('test').table("online").filter({
                    socket_id: socket.id
                }).delete().run(conn, function() {

                    r.db('test').table("online").filter({
                        news_id: news_id
                    }).count().run(conn, function(err, res) {
                        if (err) throw err;
                        say = res;
                        count--;
                        console.log("disconnect say" + say);
                        io.sockets.emit('pageview', {
                            'say': say,
                            'count': count,
                            'news_id': news_id
                        });

                    });

                });


                //console.log("news_id"+news_id);




            });
        });




    });




});

});
Run Code Online (Sandbox Code Playgroud)

我的client.js:

 var socket = io.connect("http://192.168.1.198:3000");


    socket.on('connect', function () {

        console.log('Socket connected');
        socket.send(news_id);
        socket.on('pageview', function (msg) {


            if(msg.news_id == news_id)
                        {
                                //$('.count_icon').html(msg.say);
                                console.log(msg.say);

                        }


        });

    });
Run Code Online (Sandbox Code Playgroud)

运行server.js时出现此错误:

Unhandled rejection RqlDriverError: First argument to `run` must be an open connection.
at new RqlDriverError (/root/node_modules/rethinkdb/errors.js:14:13)
at Bracket.TermBase.run (/root/node_modules/rethinkdb/ast.js:129:29)
at /home/node1/server.js:82:90
at tryCatcher (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/util.js:24:31)
at Promise.errorAdapter (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/nodeify.js:35:34)
at Promise._settlePromiseAt (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/promise.js:528:21)
at Promise._settlePromises (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/promise.js:646:14)
at Async._drainQueue (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/async.js:177:16)
at Async._drainQueues (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/async.js:187:10)
at Async.drainQueues (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/async.js:15:14)
at process._tickCallback (node.js:442:13)
Run Code Online (Sandbox Code Playgroud)

Jor*_*lva 5

问题似乎是你打开了太多连接而RethinkDB可能正在关闭其中一些连接.您可以添加逻辑以在每次打开它们时关闭这些连接,您可以使用一个全局连接,或者您可以使用rethinkdbdash之类的东西,通过使用连接池来处理这些连接.

以下是使用全局连接(并使用promises)的代码.

var express = require('express'),
path = require('path'),
http = require('http'),
io = require('socket.io'),
r = require('rethinkdb');


var app = express();

app.configure(function() {
    app.set('port', process.env.PORT || 3000);
    app.use(express.logger('dev'));
    app.use(express.bodyParser())
    app.use(express.static(path.join(__dirname, 'public')));
});

var server = http.createServer(app);
io = io.listen(server);

server.listen(app.get('port'), function() {
    console.log("Express server listening on port " + app.get('port'));
});

var count = 0;
var say = 0;
r.connect({
    host: 'localhost',
    port: 28015
})
.then(function (conn) {
    io.sockets.on('connection', function(socket) {

        count++;
        socket.on('message', function(message) {
            url = message;
            //socket.join(url);
            Promise.resolve()
            .then(function() {
                return r.db('test').table('online').insert({
                    socket_id: socket.id,
                    news_id: message
                }).run(conn)
            })
            .then(function() {
                return r.db('test').table("online").filter({
                    news_id: message
                }).count().run(conn);
            })
            .then(function (res) {
                say = res;
                console.log("connect say" + say);
                io.sockets.emit('pageview', {
                    'say': say,
                    'count': count,
                    'news_id': message
                });
            });
        });

        socket.on('disconnect', function() {
            var news_id = 0;
            console.log("Socket disconnected:" + socket.id);
            Promise.resolve()
                .then(function () {
                    return r.db('test').table("online").filter({
                        socket_id: socket.id
                    })("news_id").run(conn);
                })
                .then(function (cursor) {
                    cursor.each(function(err, item) {
                        news_id = item;
                        Promise.resolve()
                            .then(function () {
                                return r.db('test').table("online").filter({
                                    socket_id: socket.id
                                }).delete().run(conn);
                            })
                            .then(function () {
                                return r.db('test').table("online").filter({
                                    news_id: news_id
                                }).count().run(conn);
                            })
                            .then(function (res) {
                                say = res;
                                count--;
                                console.log("disconnect say" + say);
                                io.sockets.emit('pageview', {
                                    'say': say,
                                    'count': count,
                                    'news_id': news_id
                                });
                            });
                    });
                });
        });
    });
})
Run Code Online (Sandbox Code Playgroud)