我在面向服务的体系结构中使用RabbitMQ作为消息队列,其中许多单独的Web服务发布绑定到RabbitMQ队列的消息.这些队列由各种消费者订阅,这些消费者执行后台工作; RabbitMQ的一个漂亮的香草用例.
现在我想改变一些队列参数(具体来说,我想将队列绑定到一个带有某个路由键的新死信交换).我的问题是,由于几个原因,在生产系统上进行这种改变是有问题的.
对于我来说,转换到这些新队列的最佳方式是什么,而不会丢失生产系统中的消息?
我已经考虑了从版本化队列名称到使用新设置创建新vhost以进行所有更改的所有内容.
以下是我面临的一些问题:
因为RabbitMQ队列是幂等的,所以不同的Web服务在发布之前已经声明了队列(如果它们尚不存在).更改队列参数(但保持相同的路由键)后,队列声明失败,RabbitMQ关闭通道.
我想在更改队列时不丢失消息(这里我打算订阅一个保存消息然后重新发布到新队列的独占消费者).
不同出版商与消费者群体之间的一般协调(或者更好的是,避免需要协调他们的方式).
我想知道这是否可行.我想从队列中提取任务,并且在将ack发送回RabbitMQ通知工作已完成之前,可能需要3秒或更长(可能)分钟的时间.工作由用户完成,因此这就是处理工作所需时间的原因.
我不想在弹出队列后立即确认消息,因为如果没有收到确认,我希望消息重新排队.任何人都可以给我任何有关如何解决我的问题的见解?
我正在项目中使用带有RabbitMQ的Bunny Gem,以便多个应用程序可以执行RPC调用.我正在努力想办法用这些调用进行集成测试.当然我可以模拟和对象并存根一个方法来返回一些假数据,但是确实像RabbitMQ这样的VCR gem会很好.关于如何最好地测试兔子的任何指导?
在初始化程序中设置rabbitmq连接时
#config/initializers/rabbitmq.rb
$rabbitmq_connection = Bunni.new "amqp://#{user}:#{pass}@#{host}:#{port}#{vhost}"
$rabbitmq_connection.start
$rabbitmq_channel = $rabbitmq_connection.create_channel
Run Code Online (Sandbox Code Playgroud)
从我尝试创建交换和发布的地方抛出超时错误
class Publisher
...
exchange = $rabbitmq_channel.topic 'some', {auto_delete: false, passive: true}
end
Run Code Online (Sandbox Code Playgroud)
错误跟踪
E, [2015-10-05T11:59:16.448537 #14889] ERROR -- : Error: Timeout::Error: Timeout::Error from
/home/deployer/project/shared/bundle/ruby/2.1.0/bundler/gems/bunni-cd347c9da757/lib/bunni/concurrent/continuation_queue.rb:33:in `block in poll'
/home/deployer/project/shared/bundle/ruby/2.1.0/bundler/gems/bunni-cd347c9da757/lib/bunni/concurrent/continuation_queue.rb:30:in `synchronize'
/home/deployer/project/shared/bundle/ruby/2.1.0/bundler/gems/bunni-cd347c9da757/lib/bunni/concurrent/continuation_queue.rb:30:in `poll'
/home/deployer/project/shared/bundle/ruby/2.1.0/bundler/gems/bunni-cd347c9da757/lib/bunni/channel.rb:1774:in `wait_on_continuations'
/home/deployer/project/shared/bundle/ruby/2.1.0/bundler/gems/bunni-cd347c9da757/lib/bunni/channel.rb:1176:in `block in exchange_declare'
/usr/local/rvm/rubies/ruby-2.1.2/lib/ruby/2.1.0/timeout.rb:91:in `block in timeout'
/usr/local/rvm/rubies/ruby-2.1.2/lib/ruby/2.1.0/timeout.rb:101:in `call'
/usr/local/rvm/rubies/ruby-2.1.2/lib/ruby/2.1.0/timeout.rb:101:in `timeout'
/home/deployer/project/shared/bundle/ruby/2.1.0/bundler/gems/bunni-cd347c9da757/lib/bunni/channel.rb:1175:in `exchange_declare'
/home/deployer/project/shared/bundle/ruby/2.1.0/bundler/gems/bunni-cd347c9da757/lib/bunni/exchange.rb:245:in `declare!'
/home/deployer/project/shared/bundle/ruby/2.1.0/bundler/gems/bunni-cd347c9da757/lib/bunni/exchange.rb:83:in `initialize'
/home/deployer/project/shared/bundle/ruby/2.1.0/bundler/gems/bunni-cd347c9da757/lib/bunni/channel.rb:344:in `new'
/home/deployer/project/shared/bundle/ruby/2.1.0/bundler/gems/bunni-cd347c9da757/lib/bunni/channel.rb:344:in `topic'
/home/deployer/project/releases/20151005085039/app/services/publisher.rb:32:in `publish'
Run Code Online (Sandbox Code Playgroud)
如果直接在Publisher中创建连接和通道而不是工作.
class Publisher
...
$rabbitmq_connection = Bunni.new "amqp://#{user}:#{pass}@#{host}:#{port}#{vhost}"
$rabbitmq_connection.start
$rabbitmq_channel = $rabbitmq_connection.create_channel …Run Code Online (Sandbox Code Playgroud) 我正在尝试为现有队列编写使用者.
RabbbitMQ在一个单独的实例中运行,名为"org-queue"的队列已经创建并绑定到交换机.org-queue是一个持久队列,它还有一些额外的属性.
现在我需要从这个队列接收消息.我使用下面的代码来获取队列的实例
conn = Bunny.new
conn.start
ch = conn.create_channel
q = ch.queue("org-queue")
Run Code Online (Sandbox Code Playgroud)
它给我一个错误,说明不同的耐用性.默认情况下,Bunny使用的持久= false.所以我添加了持久的true作为参数.现在它说明了其他参数之间的区别.我是否需要指定所有参数才能连接到它?由于rabbitMQ由不同的环境维护,我很难获得所有属性.
有没有办法获取队列列表并在客户端中侦听所需的队列,而不是通过所有参数连接到队列.
我整合兔子创业板的RabbitMQ使用Rails,我应该开始兔子线程Rails所用的应用程序开始启动或做在一个单独的rake任务,所以我可以在一个单独的进程中启动它的初始化?
我想如果我只生成消息,那么我需要在Rails初始化程序中执行它,这样它可以在应用程序中使用,但如果我正在消耗我应该在单独的rake任务中执行,这是正确的吗?
RabbitMQ 提供了一个优先级队列,其中消息可能具有优先级并以相反的优先级传递给消费者。
使用Bunny gem,我创建了一个优先队列。然后,我发布 5 条没有优先级的消息和 2 条优先级为 1 的消息,并检查我的消费者日志。不幸的是,我的消费者告诉我它处理 5 条无优先级消息,然后处理 2 条具有优先级的消息。通过添加睡眠,我确保每条消息至少需要 2 秒来处理。我的频道的预取也设置为 1。这是我使用的示例代码
require "bunny"
require "logger"
logger = Logger.new(STDERR)
bunny = Bunny.new(ENV["AMQP_URL"], logger: logger)
bunny.start
at_exit { bunny.stop }
channel = bunny.channel
channel.prefetch 1
routing_key = "build-show-report"
exchange = channel.exchange("signals", passive: true)
queue = channel.queue("signal.#{routing_key}", durable: true, arguments: {"x-max-priority" => 3})
queue.bind(exchange, routing_key: routing_key)
queue.subscribe(manual_ack: true, block: false) do |delivery_info, properties, payload|
logger.info "Received #{payload}"
sleep 2
channel.acknowledge(delivery_info.delivery_tag, false)
end
5.times {|n| …Run Code Online (Sandbox Code Playgroud) 在Mac上运行Rails应用程序.我在其他终端选项卡中启动rabbit mq server:
$ rabbitmq-server
我在我的Web应用程序中做了工作人员,当邮件被发送时,我在/usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log中得到了这个.
= INFO REPORT ==== 2015年4月21日:: 23:02:47 ===
接受AMQP连接<0.4286.0>(127.0.0.1:57509 - > 127.0.0.1:5672)
=错误报告==== 2015年4月21日:: 23:02:50 ===
关闭AMQP连接<0.4286.0>(127.0.0.1:57509 - > 127.0.0.1:5672):{heartbeat_timeout,running}
笔记:
我将所有rabbitmq配置保留为默认值,我的意思是在mailer.rb中:
Run Code Online (Sandbox Code Playgroud):heartbeat => 10
相同的配置在Ubuntu中完美运行.
我有一个我在.NET中编写的应用程序,可以使用单个使用者监视多个RabbitMq队列.
例如:
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
var _consumer = new QueueingBasicConsumer(channel);
string[] list = new string[] { "Queue1", "Queue2", "Queue3", "Queue4" };
_consumer = new QueueingBasicConsumer(channel);
foreach (string currQueueName in list)
{
channel.QueueDeclare(currQueueName, false, false, false, null);
channel.BasicConsume(currQueueName, true, _consumer);
}
while (true)
{
var ea = (BasicDeliverEventArgs)_consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
ProcessMessage(message);
}
}
}
Run Code Online (Sandbox Code Playgroud)
基本上,我只是希望能够跨多个队列分配工作,但只有一个应用程序全部使用它们(或者可以部署多个应用程序并执行相同的功能).
我正试图在队列中分散工作,以便消费者在队列中平等地工作.
这可能是使用Bunny或本机Ruby驱动程序吗?