use*_*780 4 javascript amqp rabbitmq node.js express
我到处都在寻找在Node.js中headers exchange使用RabbitMQ的示例。如果有人能指出我正确的方向,那就太好了。这是我到目前为止的内容:
发布者方法(创建发布者)
RabbitMQ.prototype.publisher = function(exchange, type) {
console.log('New publisher, exchange: '+exchange+', type: '+type);
amqp.then(function(conn) {
conn.createConfirmChannel().then(function(ch) {
publishers[exchange] = {};
publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
publishers[exchange].ch = ch;
});
},function(err){
console.error("[AMQP]", err.message);
return setTimeout(function(){
self.connect(URI);
}, 1000);
}).then(null, console.log);
};
Run Code Online (Sandbox Code Playgroud)
发布方法
RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
try {
publishers[exchange].assert.then(function(){
publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
publishers[exchange].ch.connection.close();
}
});
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
};
Run Code Online (Sandbox Code Playgroud)
消费者方法(创建消费者)
RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
amqp.then(function(conn) {
conn.createChannel().then(function(ch) {
var ok = ch.assertExchange(exchange, type, {durable: true});
ok.then(function() {
ch.assertQueue('', {exclusive: true});
});
ok = ok.then(function(qok) {
var queue = qok.queue;
ch.bindQueue(queue,exchange,routingKey)
});
ok = ok.then(function(queue) {
ch.consume(queue, function(msg){
cb(msg,ch);
}, {noAck: false});
});
ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
});
}).then(null, console.warn);
};
Run Code Online (Sandbox Code Playgroud)
上面的例子适用于上面的例子topics,但是我不知道如何过渡到headers。我很确定我需要更改绑定方法,但是还没有找到任何有关如何精确完成此操作的示例。
任何帮助将不胜感激!
我偶然发现了这个问题,为amqplib寻找相同的答案。不幸的是,像您一样,我发现缺少所有可用的文档 。在查看了源代码,仔细阅读了协议并尝试了几种组合之后,这终于为我完成了。
...
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
...
Run Code Online (Sandbox Code Playgroud)
...
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
...
Run Code Online (Sandbox Code Playgroud)
完整的工作代码如下。下面的身份验证信息是伪造的,因此您必须使用自己的身份验证信息。我还使用了ES6,nodejs 6.5版和amqplib。给您的标题加上x-前缀和/或使用保留字作为标题名称可能会出现问题,但是我不太确定(我必须查看RabbitMQ源代码)。
emit.js:
...
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
...
Run Code Online (Sandbox Code Playgroud)
receive.js:
...
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
...
Run Code Online (Sandbox Code Playgroud)