Eki*_*bal 2 ruby amqp rabbitmq eventmachine
我有一个ruby进程,它使用amqp gem从RabbitMQ队列中消耗msgs,如下所示:
require "bundler/setup"
require "amqp"
require 'eventmachine'
require 'em-http'
AMQP.start(:host => $AMQP_URL) do |connection|
@channel ||= AMQP::Channel.new(connection)
@queue ||= @channel.queue("results")
puts " [*] Waiting for messages. "
@queue.subscribe do |body|
http = EventMachine::HttpRequest.new(URL).post :body => body
http.callback {
# do something
}
http.errback {
$LOG.error "[errorback] -> #{http.error}"
}
end
end
Run Code Online (Sandbox Code Playgroud)
现在URL很慢,队列有很多消息(> 30K),我在日志中遇到了这个错误:
**[errorback] -> unable to create new socket: Too many open files**
Run Code Online (Sandbox Code Playgroud)
任何帮助都将受到高度赞赏,因为我一直在努力寻找如何解决它,但根本没有任何结果.
提前致谢
您消耗的消息太快了.由于您基本上一次获取所有未传递的消息(即,与RabbitMQ可以提供它们一样快),并为每条消息打开HTTP连接,您最终会消耗系统的所有可用资源(在这种情况下是并行打开的套接字的数量) .
在阅读有关消息确认和文档的文档后AMQP::Queue#subscribe,我建议对代码进行以下更改:
AMQP.start(host: $AMQP_URL) do |connection|
@channel ||= AMQP::Channel.new(connection)
@channel.prefetch(5)
@queue ||= @channel.queue("results")
# disable auto-ack, switch to manual mode
@queue.subscribe(ack: true) do |meta, body|
http = EventMachine::HttpRequest.new(URL).post body: body
http.callback {
# do something
# acknowledge message consumption
meta.ack
}
http.errback {
# ...
# do not `meta.ack` here, so the message gets redelivered
}
end
end
Run Code Online (Sandbox Code Playgroud)
这样,您可以非常轻松地节省工作量.
| 归档时间: |
|
| 查看次数: |
370 次 |
| 最近记录: |