Tim*_*per 8 ruby amqp rabbitmq
我有一个场景,我需要非常快速地分配和处理作业.我将在队列中快速填充大约45个作业,我可以同时处理大约20个作业(5台机器,每台4个核心).每个作业花费不同的时间,并且使事情变得复杂,垃圾收集是一个问题所以我需要能够让消费者离线进行垃圾收集.
目前,我有一切都在使用pop(每个消费者每隔5ms弹出一次).这似乎是不合需要的,因为它转换为每秒600次popm请求到rabbitmq.
我很乐意,如果有一个pop命令会像订阅一样,但只有一条消息.(进程会阻塞,等待来自rabbitMQ连接的输入,通过类似于Kernel.select的东西)
我试图欺骗AMQP gem来做这样的事情,但它不起作用:在队列为空并且不再向消费者发送消息之前,我似乎无法取消订阅.取消订阅的其他方法我担心会失去信息.
consume_1.rb:
require "amqp"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
queue.unsubscribe { puts "unbound" }
sleep 3
end
end
Run Code Online (Sandbox Code Playgroud)
consumer_many.rb:
require "amqp"
# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
end
end
Run Code Online (Sandbox Code Playgroud)
producer.rb:
require "amqp"
i = 0
EventMachine.run do
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
EM.add_periodic_timer(1) do
msg = "Message #{i}"
i+=1
puts "~ publishing #{msg}"
end
end
Run Code Online (Sandbox Code Playgroud)
我将启动consume_many.rb和producer.rb.消息将按预期流动.
当我启动consume_1.rb时,它会获取所有其他消息(如预期的那样).但它永远不会取消订阅,因为它永远不会完成处理它的所有消息......所以就这样了.
我如何获得consume_1.rb订阅队列,获取单个消息,然后将自己从负载均衡器环中取出,这样它就可以正常工作,而不会丢失可能在队列中的任何其他待处理作业否则会被安排发送到这个过程?
蒂姆
Iva*_*van 13
这是AMQP gem的一个简单但很难记录的功能,你需要的是:
在您的消费者中:
channel = AMQP::Channel.new(connection, :prefetch => 1)
Run Code Online (Sandbox Code Playgroud)
然后使用您的订阅块,执行:
queue.subscribe(:ack => true) do |queue_header, payload|
puts "Received a message: #{payload}."
# Do long running work here
# Acknowledge message
queue_header.ack
end
Run Code Online (Sandbox Code Playgroud)
这是什么告诉RabbitMQ一次只发送你的消费者1消息,而不是发送另一条消息,直到你在长时间运行一个长时间运行的任务后调用ack队列头.
我可能需要对此进行纠正,但我相信direct交换会更适合这项任务.
| 归档时间: |
|
| 查看次数: |
3634 次 |
| 最近记录: |