如何在Rabbit.js上创建REP/REQ

mr.*_*r.b 2 rabbitmq node.js

我已经在.Net上使用RabbitMQ已经有一段时间了,我没有太多问题.现在我转向使用node.js的rabbit.js,我对它并不熟悉.rabbit.js的文档有限.我所知道的只是基本的PUSH/PULL或PUB/SUB.现在我想做REQ/REP,我不知道怎么做.任何人都可以分享一些片段.

非常感谢您的回复.

最好,

kmp*_*mpm 14

这可能是你要求的,但我有一个snipplet(即使它很长)使用node-amqp代替REQ/RES与rabbit.js 进行RPC .我所做的与您在RabbitMQ教程中可以找到的有关RPC的内容类似

目前,消息中的内容应该是一个对象(哈希),它将由amqp模块转换为json.

AmqpRpc类在初始化时采用amqp连接,然后它应该只是调用makeRequest并等待回调中的响应.响应具有函数形式(错误,响应),其中错误可能是超时错误

对不起,这不是你要求的,但它可能足够接近.我还在github上发布了代码:https://gist.github.com/2720846

编辑: 更改了示例以支持多个未完成的请求.

amqprpc.js

var amqp = require('amqp')
  , crypto = require('crypto')

var TIMEOUT=2000; //time to wait for response in ms
var CONTENT_TYPE='application/json';

exports = module.exports = AmqpRpc;

function AmqpRpc(connection){
  var self = this;
  this.connection = typeof(connection) != 'undefined' ? connection : amqp.createConnection();
  this.requests = {}; //hash to store request in wait for response
  this.response_queue = false; //plaseholder for the future queue
}

AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){
  var self = this;
  //generate a unique correlation id for this call
  var correlationId = crypto.randomBytes(16).toString('hex');

  //create a timeout for what should happen if we don't get a response
  var tId = setTimeout(function(corr_id){
    //if this ever gets called we didn't get a response in a 
    //timely fashion
    callback(new Error("timeout " + corr_id));
    //delete the entry from hash
    delete self.requests[corr_id];
  }, TIMEOUT, correlationId);

  //create a request entry to store in a hash
  var entry = {
    callback:callback,
    timeout: tId //the id for the timeout so we can clear it
  };

  //put the entry in the hash so we can match the response later
  self.requests[correlationId]=entry;

  //make sure we have a response queue
  self.setupResponseQueue(function(){
    //put the request on a queue
    self.connection.publish(queue_name, content, {
      correlationId:correlationId,
      contentType:CONTENT_TYPE,
      replyTo:self.response_queue});
  });
}


AmqpRpc.prototype.setupResponseQueue = function(next){
  //don't mess around if we have a queue
  if(this.response_queue) return next();

  var self = this;
  //create the queue
  self.connection.queue('', {exclusive:true}, function(q){  
    //store the name
    self.response_queue = q.name;
    //subscribe to messages
    q.subscribe(function(message, headers, deliveryInfo, m){
      //get the correlationId
      var correlationId = m.correlationId;
      //is it a response to a pending request
      if(correlationId in self.requests){
        //retreive the request entry
        var entry = self.requests[correlationId];
        //make sure we don't timeout by clearing it
        clearTimeout(entry.timeout);
        //delete the entry from hash
        delete self.requests[correlationId];
        //callback, no err
        entry.callback(null, message);
      }
    });
    return next();    
  });
}
Run Code Online (Sandbox Code Playgroud)

关于如何使用它的一个小例子可以在下面找到.保存两个代码部分,然后运行...

node client.js
Run Code Online (Sandbox Code Playgroud)

如果您没有服务器来提供回复,请求将超时.

client.js

//exmaple on how to use amqprpc
var amqp = require('amqp');
var connection = amqp.createConnection({host:'127.0.0.1'});

var rpc = new (require('./amqprpc'))(connection);

connection.on("ready", function(){
  console.log("ready");
  var outstanding=0; //counter of outstanding requests

  //do a number of requests
  for(var i=1; i<=10 ;i+=1){
    //we are about to make a request, increase counter
    outstanding += 1;
    rpc.makeRequest('msg_queue', {foo:'bar', index:outstanding}, function response(err, response){
      if(err)
        console.error(err);
      else
        console.log("response", response);
      //reduce for each timeout or response
      outstanding-=1;
      isAllDone();
    });
  }

  function isAllDone() {
    //if no more outstanding then close connection
    if(outstanding === 0){
      connection.end();
    }
  }

});
Run Code Online (Sandbox Code Playgroud)

我甚至会投入一个样本服务器以获得良好的衡量标准

server.js

//super simple rpc server example
var amqp = require('amqp')
  , util = require('util');

var cnn = amqp.createConnection({host:'127.0.0.1'});

cnn.on('ready', function(){
  console.log("listening on msg_queue");
  cnn.queue('msg_queue', function(q){
      q.subscribe(function(message, headers, deliveryInfo, m){
        util.log(util.format( deliveryInfo.routingKey, message));
        //return index sent
        cnn.publish(m.replyTo, {response:"OK", index:message.index}, {
            contentType:'application/json',
            contentEncoding:'utf-8',
            correlationId:m.correlationId
          });
      });
  });
});
Run Code Online (Sandbox Code Playgroud)