RabbitMQ/AMQP:单个队列,同一个消息的多个消费者?

mik*_*ana 126 messaging amqp rabbitmq node.js node-amqp

我一般只是开始使用RabbitMQ和AMQP.

  • 我有一个消息队列
  • 我有多个消费者,我想用同样的消息做不同的事情.

大多数RabbitMQ文档似乎都专注于循环,即单个消费者使用单个消息,负载在每个消费者之间传播.这确实是我见证的行为.

例如:生产者有一个队列,每2秒发送一次消息:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})
Run Code Online (Sandbox Code Playgroud)

这是一个消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})
Run Code Online (Sandbox Code Playgroud)

如果我启动消费者两次,我可以看到每个消费者正在使用循环行为中的备用消息.例如,我将在一个终端中看到消息1,3,5,在另一个终端中看到2,4,6.

我的问题是:

  • 我可以让每个消费者收到相同的消息吗?即,两个消费者都得到消息1,2,3,4,5,6?在AMQP/RabbitMQ中这叫什么?它是如何正常配置的?

  • 这常见吗?我应该将交换路由信息分成两个单独的队列,而不是一个消费者吗?

mik*_*ana 105

我可以让每个消费者收到相同的消息吗?即,两个消费者都得到消息1,2,3,4,5,6?在AMQP/RabbitMQ中这叫什么?它是如何正常配置的?

不,如果消费者在同一个队列中,则不会.来自RabbitMQ的AMQP Concepts指南:

重要的是要理解,在AMQP 0-9-1中,消息在消费者之间进行负载平衡.

这似乎意味着队列中的循环行为是给定的,而不是可配置的.即,需要单独的队列以便由多个消费者处理相同的消息ID.

这常见吗?我应该将交换路由信息分成两个单独的队列,而不是一个消费者吗?

不可以,单个队列/多个消费者,每个消费者处理相同的消息ID是不可能的.将消息转换为两个单独的队列的交换路由确实更好.

因为我不需要太复杂的路由,所以扇出交换将很好地处理这个问题.我没有过多地关注在交易所早前为节点AMQP有一个"默认交换"让您直接发布消息到连接的概念,但大多数AMQP的消息发布到特定的交易所.

这是我的粉丝交换,包括发送和接收:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})
Run Code Online (Sandbox Code Playgroud)

  • 澄清一下:'default exchange'不是node-amqp特有的.它是一般AMQP概念,具有以下规则:当任何消息发布到默认交换时,路由密钥(用于发布该消息)被AMQP代理视为队列名称.所以看起来你可以直接发布到队列.但你不是.代理只需将每个队列绑定到默认交换,路由密钥等于队列名称. (3认同)
  • 扇出显然是你想要的。它在这里对您没有帮助,但我想我会提到队列中的循环行为是可配置的。`int prefetchCount = 1; channel.basicQos(prefetchCount);` 这将允许每个消费者在完成前一个消息后立即接收消息。而不是接收交替消息。同样不能解决您的问题,但可能对人们了解有用。这里的例子 [http://www.rabbitmq.com/tutorials/tutorial-two-java.html](http://www.rabbitmq.com/tutorials/tutorial-two-java.html) 在公平调度下 (2认同)
  • 在rabbitmq中是否有任何替代Apache activemq jms主题的方法,其中不涉及队列而是多播? (2认同)

dri*_*kin 21

刚看完rabbitmq教程.您发布消息以进行交换,而不是排队; 然后将其路由到适当的队列.在您的情况下,您应该为每个使用者绑定单独的队列.这样,他们就可以完全独立地使用消息.


z90*_*tor 14

最后几个答案几乎是正确的 - 我有大量的应用程序生成需要最终与不同消费者的消息,因此过程非常简单.

如果您希望多个使用者收到同一条消息,请执行以下步骤.

为每个要接收消息的应用创建多个队列,在每个队列属性中,使用amq.direct交换"绑定"路由标记.更改发布应用程序以发送到amq.direct并使用路由标记(不是队列).然后,AMQP将使用相同的绑定将消息复制到每个队列中.作品谎言魅力:)

示例:假设我有一个我生成的JSON字符串,我使用路由标记"new-sales-order"将其发布到"amq.direct"交换,我有一个用于打印订单的order_printer应用程序的队列,我有一个我的计费系统的队列将发送订单的副本并为客户开具发票我有一个网络存档系统,我在那里存档订单是出于历史/合规性的原因而我有一个客户端网络界面,其中订单被跟踪,其他信息来自于订单.

所以我的队列是:order_printer,order_billing,order_archive和order_tracking都绑定了绑定标签"new-sales-order",所有4个都将获得JSON数据.

这是在没有发布应用知道或关心接收应用的情况下发送数据的理想方式.

  • 如果消费者是动态的怎么办?我希望同一应用程序的不同实例收到相同的消息。还有其他选择吗? (3认同)

rob*_*olf 6

是的每个消费者都可以收到相同的消息.请访问 http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq. COM /教程/教程五python.html

用于路由消息的不同方式.我知道它们适用于python和java,但它很好理解原理,决定你在做什么,然后找到如何在JS中做到这一点.听起来你想做一个简单的扇出(教程3),它将消息发送到连接到交换机的所有队列.

与您正在做什么以及您想要做什么的区别在于您要设置和交换或键入扇出.Fanout excahnges将所有消息发送到所有连接的队列.每个队列都有一个消费者,可以分别访问所有消息.

是的,这是常见的,它是AMPQ的功能之一.


Pet*_*hie 6

发送模式是一对一的关系.如果你想"发送"到多个接收器,你应该使用pub/sub模式.有关详细信息,请参阅http://www.rabbitmq.com/tutorials/tutorial-three-python.html.