标签: node-amqp

RabbitMQ/AMQP:单个队列,同一个消息的多个消费者?

我一般只是开始使用RabbitMQ和AMQP.

  • 我有一个消息队列
  • 我有多个消费者,我想用同样的消息做不同的事情.

大多数RabbitMQ文档似乎都专注于循环,即单个消费者使用单个消息,负载在每个消费者之间传播.这确实是我见证的行为.

例如:生产者有一个队列,每2秒发送一次消息:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})
Run Code Online (Sandbox Code Playgroud)

这是一个消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); …
Run Code Online (Sandbox Code Playgroud)

messaging amqp rabbitmq node.js node-amqp

126
推荐指数
5
解决办法
11万
查看次数

如何在RabbitMQ中重新排队消息

在消费者获得消息之后,消费者/工作者进行一些验证然后调用Web服务.在此阶段,如果发生任何错误或验证失败,我们希望将消息放回最初使用的队列.

我读过RabbitMQ文档.但我对reject,nack和cancel方法之间的区别感到困惑.

amqp rabbitmq node-amqp

28
推荐指数
1
解决办法
4万
查看次数

我应该在每次发布后关闭频道/连接吗?

我在Node.js中使用amqplib,我不清楚代码中的最佳实践.

基本上,我当前的代码调用amqp.connect()Node服务器启动的时间,然后为每个生产者和每个使用者使用不同的通道,从不实际关闭它们中的任何一个.我想知道这是否有意义,或者我应该创建频道,每次要发布消息时发布和关闭它.那连接怎么样?这是一个"良好实践"连接一次,然后在我的服务器的生命周期中保持打开?
在消费者方面 - 我可以使用单个连接和单个通道来监听多个队列吗?

感谢您的任何澄清

rabbitmq node-amqp

14
推荐指数
1
解决办法
8401
查看次数

node-amqp无法向RabbitMQ发送消息

我正在试试rabbitmq教程,ruby版本工作正常,但node.js版本无法发送消息.我不知道出了什么问题.

var amqp       = require('amqp');
var amqp_hacks = require('./amqp-hacks');

var connection = amqp.createConnection({host: 'localhost'});

connection.on('ready', function(){
    connection.publish('hello_node', 'Hello World!');
    console.log(" [x] Sent 'Hello World!'");

    amqp_hacks.safeEndConnection(connection);
});
Run Code Online (Sandbox Code Playgroud)

在我运行之后node send.js,运行过程node recv.js无法回收任何东西.并且rabbitmqctl list_queues不显示hello_node队列.

amqp rabbitmq node.js node-amqp

9
推荐指数
1
解决办法
2732
查看次数

使用node.js amqp模块时如何将AQMP消息缓冲区转换为JSON对象?

我使用node.js amqp模块从队列中读取消息.以下是队列中有可用消息时调用的回调:

function onMessage(message, headers, deliveryInfo)
{
    console.log(message); //This prints buffer
    //how to convert message (which I expect to be JSON) into a JSON object.
    //Also how to get the JSON string from the 'message' which seems to be a buffer
}
Run Code Online (Sandbox Code Playgroud)

谢谢.

type-conversion node.js node-amqp

9
推荐指数
2
解决办法
9030
查看次数

Node-amqp - X尝试后拒绝消息

如何在几次可配置的重新排队尝试后实现拒绝消息的机制?

换句话说,如果我正在订阅队列,我想保证相同的消息不再重新传递超过X次.

我的代码示例:

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) {
  try{
    doSomething(data);
  } catch(e) {
   message.reject(true);
  }
}
Run Code Online (Sandbox Code Playgroud)

rabbitmq node.js node-amqp

9
推荐指数
1
解决办法
1302
查看次数

通过websockets发送RabbitMQ消息

寻找一些代码示例来解决这个问题: -

想编写一些代码(Python或Javascript)作为RabbitMQ队列的订户,以便在收到消息时它会通过websockets将消息广播到任何连接的客户端.

我看过Autobahn和node.js(使用" amqp "和" ws "),但无法根据需要开始工作.这是使用node.js的javascript中的服务器代码: -

var amqp = require('amqp');
var WebSocketServer = require('ws').Server

var connection = amqp.createConnection({host: 'localhost'});
var wss = new WebSocketServer({port:8000});

wss.on('connection',function(ws){

    ws.on('open', function() {
        console.log('connected');
        ws.send(Date.now().toString());
    });

    ws.on('message',function(message){
            console.log('Received: %s',message);
            ws.send(Date.now().toString());
    });
});

connection.on('ready', function(){
    connection.queue('MYQUEUE', {durable:true,autoDelete:false},function(queue){
            console.log(' [*] Waiting for messages. To exit press CTRL+C')
            queue.subscribe(function(msg){
                    console.log(" [x] Received from MYQUEUE %s",msg.data.toString('utf-8'));
                    payload = msg.data.toString('utf-8');
                    // HOW DOES THIS NOW GET SENT VIA WEBSOCKETS ??
            });
    });
});
Run Code Online (Sandbox Code Playgroud)

使用此代码,我可以成功订阅Rabbit中的队列并接收发送到队列的所有消息.同样,我可以将websocket客户端(例如浏览器)连接到服务器并发送/接收消息.但是......如何在指定的位置发送Rabbit队列消息的有效负载作为websocket消息("现在如何通过WEBSOCKET获取")?我认为这与陷入错误的回调有关,或者他们需要以某种方式嵌套......?

或者,如果这可以通过Python(通过Autobahn和pika)更容易完成,那将是很好的.

谢谢 !

rabbitmq websocket node.js node-amqp python-pika

7
推荐指数
1
解决办法
5711
查看次数

amqp.node不会检测到连接丢弃

我们有一个运行socket.io服务器的nod​​e.js脚本,其客户端使用来自RabbitMQ队列的消息.我们最近迁移到Amazon AWS,而RabbitMQ现在是两台机器(冗余实例)的集群.AMQP连接不时丢失(这是从具有冗余VM的高可用性环境到达的限制,我们必须应对它)并且如果尝试重新连接,则DNS选择要连接的实例(它是一个具有数据复制的集群,因此连接到哪个实例并不重要.

问题是从未进行重新连接的尝试; 一段时间后,当连接丢失时,amqp.node显然没有注意到连接已丢失.此外,消费者停止接收消息,而socket.io服务器只是停止接受新连接.

我们有一个55秒的心跳超时(不要与socket.io心跳超时混淆)设置在RabbitMQ URL并使用amqp.node的回调API检查"错误"和"关闭"事件,但它们显然从未发出.队列期望消耗的消息被激活.我们希望节点脚本检测丢失的连接并完成自身,因此环境将自动启动新进程并再次建立连接.

这是代码,也许我们在amqp.node回调API或其他方面做错了.

var express = require('express');
app = express();
var http = require('http');
var serverio = http.createServer(app);
var io = require('socket.io').listen(serverio, { log: false });
var socket;
var allcli = [];
var red, blue, green, magenta, reset;
red   = '\033[31m';
blue  = '\033[34m';
green  = '\033[32m';
magenta  = '\033[35m';
orange = '\033[43m';
reset = '\033[0m';

var queue = 'ha.atualizacao_mobile';
var urlRabbit = 'amqp://login:password@host?heartbeat=55' // Amazon
var amqp = require('amqplib/callback_api');
var debug = true;

console.log("Original Socket.IO …
Run Code Online (Sandbox Code Playgroud)

rabbitmq amazon-web-services node.js socket.io node-amqp

7
推荐指数
1
解决办法
4148
查看次数

RabbitMQ和node-amqp:确认模式下的Exchange无法确认 - 为什么?

我正在编写一个依赖于RabbitMQ的Node.js应用程序.我使用node-amqp作为连接RabbitMQ的首选库.

一旦我建立了与RabbitMQ的连接,我要做的第一件事就是创建一个交换:

var options = { autoDelete: false, confirm: true, durable: true, type: 'direct' };
connection.exchange('myExchange', options, function (myExchange) {
  // ...
});
Run Code Online (Sandbox Code Playgroud)

这非常有效.正如您所看到的,我正在使用创建交换confirm: true,因此我希望交换在之后处于确认模式.

现在,一旦我尝试发布消息,就会出现问题:

var options = {};
myExchange.publish('', { data: 'foobar' }, options, function () {
  // ...
});
Run Code Online (Sandbox Code Playgroud)

问题是publish函数的回调从未被调用 - 尽管消息已成功发布(正如我在RabbitMQ的Web管理工具中看到的那样).

我是否以错误的方式理解确认模式?这是node-amqp的错误吗?

任何帮助,将不胜感激 :-)

rabbitmq node.js node-amqp

6
推荐指数
1
解决办法
2764
查看次数

RabbitMQ 和 Node.js 如何处理故障转移场景?

我正在使用 RabbitMQ(集群)并使用 Node.js 客户端(node-amqp - https://github.com/postwait/node-amqp)连接到它。RabbitMQ 文档指出,处理故障转移场景(集群节点故障)应该由客户端处理,这意味着客户端应该检测到故障并连接到集群中的另一个节点。支持这种故障转移平衡的最简单方法是什么。node-amqp 客户端支持这个吗?任何示例或解决方案将不胜感激。

谢谢。

failover amqp rabbitmq node.js node-amqp

5
推荐指数
1
解决办法
893
查看次数