RabbitMQ和消息优先级

Kel*_*lly 33 rabbitmq

RabbitMQ有任何消息优先级的概念吗?我有一个问题是一些更重要的消息由于在队列中排在它之前的不太重要的消息而变慢.我希望优先级高的优先级移动到队列的前面.

我知道我可以使用两个队列"快速"队列和"慢"队列来近似这个,但这似乎是一个黑客.

有没有人知道使用RabbitMQ更好的解决方案?

wom*_*ble 58

这个问题的答案已经过时了.从RabbitMQ 3.5.0开始,现在有针对AMQP标准的每个消息优先级的内核支持.该文档包含所有血腥细节,但简而言之:

  • 您需要在创建队列时定义队列的优先级范围;
  • 没有设置优先级的消息优先级为0;
  • 数字优先级高于队列上设置的最大值的消息将获得队列支持的最高优先级.

文档中有更多有趣的警告.非常值得一读.

  • 它似乎不是消息优先级而是消费者优先级,我认为这是不同的概念而不是上述问题的答案. (2认同)

Ste*_*tin 20

兔子没有优先权的概念,正如布莱恩简洁地说的那样,前面的那个首先到达那里.;-)

我建议实现一组队列,用于满足您的特定消息传递需求,让这些队列模拟您的优先级需求,比如称他们为"MyQueueP1","MyQueueP2"等,然后让我们的消费者检查P1之前P2(等)和服务消息之前.

如果您的消息具有高优先级,则可以通过合适的路由密钥将其发布到适当的优先级队列.

[更新]检查此问题: 在FIFO Qeueing系统中,实现优先级消息传递的最佳方式是什么

[更新] 根据最新的RabbitMQ版本3.5.0,这个答案现在已经过时,应该只对本版本之前的版本有效. /sf/answers/2034780191/


van*_*nza 10

IIRC RabbitMQ仍然使用AMQP协议版本0.9.1(获取此处的规范).规范肯定提到消息优先级:

Messages may have a priority level. A high priority message is sent ahead of lower     priority messages
waiting in the same message queue. When messages must be discarded in order to maintain a specific
service quality level the server will first discard low-priority messages.
Run Code Online (Sandbox Code Playgroud)

和:

Note that in the presence of multiple readers from a queue, or client transactions, or use of priority fields,
or use of message selectors, or implementation-specific delivery optimisations the queue MAY NOT
exhibit true FIFO characteristics.
Run Code Online (Sandbox Code Playgroud)

规范说优先级是必须的,所以我猜RabbitMQ应该实现它,但你可能想查阅它的文档.


Liq*_*pie 5

是的,RabbitMQ 支持优先队列。

要使队列作为优先队列工作,请x-max-priority在声明队列时提供属性。

属性x-max-priority定义队列支持的最大优先级数。

在 Java 中,您可以执行以下操作:

Map<String, Object> props = new HashMap<>();
props.put("x-max-priority", 10); // max priority number as 10
channel.queueDeclare(QUEUE_NAME, durable, false, false, props);
Run Code Online (Sandbox Code Playgroud)

要发布特定优先级的消息,请执行以下操作:

String message = "My message with priority 7";
AMQP.BasicProperties.Builder basicProps = new AMQP.BasicProperties.Builder();
basicProps.contentType("text/plain")
            .priority(7);
channel.basicPublish("", QUEUE_NAME, basicProps.build(), message.getBytes());
Run Code Online (Sandbox Code Playgroud)