RabbitMQ 连接空闲超时导致通道泄漏

occ*_*asl 5 amqp rabbitmq node.js node-amqp

我将 node-amqp 与 Node.js 结合使用来消耗队列中的消息。我们的管理员有 60 分钟的空闲连接超时,这会导致绑定额外的队列并孤立我之前创建的队列的先前通道。我的日志如下所示,请注意每小时绑定一个额外队列(下一次将是 3 个,然后是 4 个,依此类推):

[app] 2014-08-07T16:15:25.000Z: 2014-08-07T16:15:25.174Z - debug: ConsumerTag: node-amqp-145-0.9590792271774262
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.751Z - debug: AMQP Queue bound successfully.
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.731Z - debug: AMQP Queue bound successfully.
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.344Z - debug: AMQP Queue is subscribing...
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.344Z - debug: AMQP Queue is binding...
[app] 2014-08-07T16:15:23.000Z: 2014-08-07T16:15:23.831Z - debug: AMQP Queue is initializing...
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.933Z - debug: ConsumerTag: node-amqp-145-0.6444592161569744
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.658Z - debug: AMQP Queue bound successfully.
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.341Z - debug: AMQP Queue is subscribing...
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.341Z - debug: AMQP Queue is binding...
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.067Z - debug: AMQP Queue is initializing...
Run Code Online (Sandbox Code Playgroud)

以下是我配置连接和队列的方式(注意事件queueUnbindOk,并且basicCancel在空闲超时时不会被调用,而是甚至会发生连接错误:

 // Establish connection to AMQP
    var conn = amqp.createConnection({url: amqp_url, clientProperties: { applicationName: "ma-services", capabilities: { consumer_cancel_notify: true }}});

    conn.on('ready', function () {
        logger.debug('AMQP Queue is initializing...');
        var ctag;
        var queue = conn.queue('ma.services.gapns', {'durable': true, 'autoDelete': false}, function (queue) {
            try {
                logger.debug('AMQP Queue is binding...');
                queue.bind('ma.integration.exchange', 'gapns');
                logger.debug('AMQP Queue is subscribing...');
                queue.subscribe( function (msg) {
                    // do stuff

                }).addCallback(function(ok) {
                    logger.debug("ConsumerTag: " + ok.consumerTag);
                    ctag = ok.consumerTag;
                });
            }
            catch (e) {
                logger.error("Exception occurred while processing notifications: " + e);
            }
        });
        queue.on('queueBindOk', function () {
            logger.debug('AMQP Queue bound successfully.');
        });
        queue.on('basicCancel', function() {
            // this never gets called
            logger.debug('The channel has been canceled (likely server timeout).');
        });
        queue.on('queueUnbindOk', function () {
            // Unsubscribe from queue to prevent orphan -- never gets called
            logger.debug('Unsubscribing consumertag: ' + ctag);
            queue.unsubscribe(ctag);
            logger.debug('AMQP Queue unbound successfully.');
        });
    });
Run Code Online (Sandbox Code Playgroud)

您对如何正确配置连接以优雅地处理空闲超时有什么建议吗?