RabbitMQ:主题交换的持久消息

Jul*_*ien 64 amqp rabbitmq

我是RabbitMQ的新手.

我已经建立了一个'主题'交换.消费者可以在发布者之后开始.我希望消费者能够接收在他们启动之前已经发送的消息,而这些消息还没有被消费.

交换使用以下参数设置:

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0
Run Code Online (Sandbox Code Playgroud)

使用此参数发布消息:

delivery_mode => 2
Run Code Online (Sandbox Code Playgroud)

消费者使用get()从交换中检索消息.

不幸的是,任何客户端启动之前发布的任何消息都将丢失.我使用了不同的组合.

我想我的问题是交换机没有保留消息.也许我需要在发布者和队列之间有一个队列.但这似乎不适用于通过密钥路由消息的"主题"交换.

知道我该怎么做.我使用Perl绑定Net :: RabbitMQ(应该无关紧要)和RabbitMQ 2.2.0.

Bri*_*lly 69

如果没有可用的连接消费者在发布消息时处理消息,则需要一个持久的队列来存储消息.

交换不存储消息,但队列可以存储.令人困惑的部分是,交流可以被标记为"耐用",但所有真正的意思是,该交易所本身仍然会在那里,如果你重新启动你的经纪人,但它并不能意味着发送到该交换的任何消息都自动持久.

鉴于此,这里有两个选择:

  1. 在启动发布者之前执行管理步骤以自行创建队列.您可以使用Web UI或命令行工具执行此操作.确保将其创建为持久队列,以便即使没有活动的使用者,它也会存储路由到它的任何消息.
  2. 假设您的消费者被编码为始终在启动时声明(并因此自动创建)他们的交换和队列(并且他们宣称它们是持久的),只需在启动任何发布者之前至少运行一次所有消费者.这将确保正确创建所有队列.然后,您可以关闭消费者,直到真正需要它们为止,因为队列将持续存储路由到它们的任何未来消息.

我会去#1.可能没有很多步骤可以执行,您可以始终编写所需的步骤脚本,以便可以重复这些步骤.此外,如果您的所有消费者都要从同一个队列中拉出来(而不是每个队列都有一个专用队列),那么这实际上是一个最小的管理开销.

队列是可以正确管理和控制的.否则你最终可能会被流氓消费者宣布持久排队,使用它们几分钟,但再也不会.不久之后,你将拥有一个永久增长的队列,没有减少其规模,以及即将到来的经纪人启示录.

  • 这是真的,假设每个消费者都需要自己的队列.但是你需要回答的主要问题是,"那些消费者是否需要在你出现之前发送的所有历史信息?".如果他们不关心旧消息,他们可以在启动时声明自己的队列并从该点接收所有消息,但不会更旧. (4认同)
  • 应用程序"声明"队列,然后MQ代理创建它们(如果它们尚不存在).虽然侦听器应用程序声明队列而不是发送者应用程序是有意义的,但是遇到了您所看到的问题.在运行应用程序之前,它可能是声明队列,声明交换,创建vhost等的最佳解决方案. (4认同)

Ski*_*hie 18

如Brian所述,交换机不存储消息,主要负责将消息路由到另一个交换机或队列.如果交换没有绑定到队列,那么发送到该交换的所有消息都将"丢失"

您不需要在发布者脚本中声明固定客户端队列,因为这可能不可扩展.队列可以由您的发布者动态创建,并使用交换到交换绑定在内部路由.

RabbitMQ支持交换到交换绑定,允许拓扑灵活性,解耦和其他好处.您可以在RabbitMQ Exchange to Exchange Bindings [AMPQ]中阅读更多内容

RabbitMQ Exchange交换绑定

示例拓扑

示例Python代码,如果使用队列不存在使用者,则使用持久性创建交换到交换绑定.

#!/usr/bin/env python
import pika
import sys


connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()


#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)

#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)

#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')

##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)

#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')
Run Code Online (Sandbox Code Playgroud)

  • 缺少"吃所有消息"队列,据我所知,消息仍然不会到达"已故"订阅者 (2认同)
  • 这实际上可以工作@KurtPattyn 和@flyer 因为您可以随时为`Eat All Messages` 创建一个新的消费者,它可以从那里“恢复”未处理的消息,并将它们路由到正确的位置 (2认同)