我可以在Rails 3中使用AMQP的请求/回复 - RPC模式吗?

Bob*_*ton 5 asynchronous ruby-on-rails amqp rabbitmq

由于类似于本讨论中的原因,我正在尝试使用消息传递来代替REST,以便从一个Rails 3应用程序到另一个Rails 3应用程序进行同步RPC调用.这两个应用程序都在运行.

"server"应用程序config/initializers/amqp.rbrubyamqp.info文档中有一个基于请求/回复模式的文件:

require "amqp"

EventMachine.next_tick do
  connection = AMQP.connect ENV['CLOUDAMQP_URL'] || 'amqp://guest:guest@localhost'
  channel    = AMQP::Channel.new(connection)

  requests_queue = channel.queue("amqpgem.examples.services.time", :exclusive => true, :auto_delete => true)
  requests_queue.subscribe(:ack => true) do |metadata, payload|
    puts "[requests] Got a request #{metadata.message_id}. Sending a reply..."
    channel.default_exchange.publish(Time.now.to_s,
                                     :routing_key    => metadata.reply_to,
                                     :correlation_id => metadata.message_id,
                                     :mandatory      => true)
    metadata.ack
  end

  Signal.trap("INT") { connection.close { EventMachine.stop } }
end
Run Code Online (Sandbox Code Playgroud)

在"客户端"应用程序中,我想在视图中将同步调用的结果呈现给"服务器".我意识到这有点像amqp gem这样的固有异步库的舒适区域,但我想知道是否有办法让它工作.这是我的客户config/initializers/amqp.rb:

require 'amqp'

EventMachine.next_tick do
  AMQP.connection = AMQP.connect 'amqp://guest:guest@localhost'  
  Signal.trap("INT") { AMQP.connection.close { EventMachine.stop } }
end
Run Code Online (Sandbox Code Playgroud)

这是控制器:

require "amqp"

class WelcomeController < ApplicationController
  def index    
    puts "[request] Sending a request..."

    WelcomeController.channel.default_exchange.publish("get.time",
      :routing_key => "amqpgem.examples.services.time",
      :message_id  => Kernel.rand(10101010).to_s,
      :reply_to    => WelcomeController.replies_queue.name)

    WelcomeController.replies_queue.subscribe do |metadata, payload|
      puts "[response] Response for #{metadata.correlation_id}: #{payload.inspect}"
      @message = payload.inspect
    end   
  end

  def self.channel
    @channel ||= AMQP::Channel.new(AMQP.connection)
  end

  def self.replies_queue
    @replies_queue ||= channel.queue("reply", :exclusive => true, :auto_delete => true)
  end
end
Run Code Online (Sandbox Code Playgroud)

当我在不同的端口上启动两个应用程序并访问welcome#index视图时. @message在视图中为零,因为结果尚未返回.结果在渲染视图后几毫秒到达并显示在控制台上:

$ thin start
>> Using rack adapter
>> Thin web server (v1.5.0 codename Knife)
>> Maximum connections set to 1024
>> Listening on 0.0.0.0:3000, CTRL+C to stop
[request] Sending a request...
[response] Response for 3877031: "2012-11-27 22:04:28 -0600"
Run Code Online (Sandbox Code Playgroud)

这里不足为奇:subscribe显然不适合同步通话.令人惊讶的是,我无法在AMQP gem源代码或任何在线文档中找到同步替代方案.是否有替代方案subscribe可以提供我想要的RPC行为?鉴于系统的其他部分我想要使用合法的异步调用,那么bunny gem似乎不适合这项工作.我应该再看一眼吗?

编辑以响应Sam Stokes

感谢Sam提供的抛出指针:async/async.callback.我之前没有见过这种技术,这正是我首先尝试用这个实验学习的东西.send_response.finish在Rails 3中已经消失了,但是我能够通过一个小小的改动让他的例子至少为一个请求工作:

render :text => @message
rendered_response = response.prepare!
Run Code Online (Sandbox Code Playgroud)

后续请求失败!! Unexpected error while processing request: deadlock; recursive locking.这可能是Sam在关于获取ActionController以允许并发请求的评论所得到的,但引用的要点仅适用于Rails 2. config.allow_concurrency = true在development.rb中添加在Rails 3中摆脱了这个错误,但是This queue already has default consumer.从AMQP 引出.

我觉得这只牦牛已经剃光了.;-)

虽然有趣,但对于简单的RPC来说,这显然有些过分.像这样的Sinatra流示例似乎是客户端与回复交互的更合适的用例.Tenderlove还有一篇博文,内容是关于即将在Rails 4中传输可与AMQP配合使用的事件的方法.

正如Sam在讨论HTTP替代方案时指出的那样,REST/HTTP对我的系统中涉及两个Rails应用程序的RPC部分非常有意义.系统的其他部分涉及更多经典的异步事件发布到Clojure应用程序.对于这些,Rails应用程序只需要以"一劳永逸"的方式发布事件,因此AMQP可以使用我的原始代码在没有回复队列的情况下正常工作.

Sam*_*kes 5

您可以获得所需的行为 - 让客户端发出简单的HTTP请求,您的Web应用程序异步响应 - 但您需要更多技巧.您需要使用Thin支持异步响应:

require "amqp"

class WelcomeController < ApplicationController
  def index
    puts "[request] Sending a request..."

    WelcomeController.channel.default_exchange.publish("get.time",
      :routing_key => "amqpgem.examples.services.time",
      :message_id  => Kernel.rand(10101010).to_s,
      :reply_to    => WelcomeController.replies_queue.name)

    WelcomeController.replies_queue.subscribe do |metadata, payload|
      puts "[response] Response for #{metadata.correlation_id}: #{payload.inspect}"
      @message = payload.inspect

      # Trigger Rails response rendering now we have the message.
      # Tested in Rails 2.3; may or may not work in Rails 3.x.
      rendered_response = send_response.finish

      # Pass the response to Thin and make it complete the request.
      # env['async.callback'] expects a Rack-style response triple:
      # [status, headers, body]
      request.env['async.callback'].call(rendered_response)
    end

    # This unwinds the call stack, skipping the normal Rails response
    # rendering, all the way back up to Thin, which catches it and
    # interprets as "I'll give you the response later by calling
    # env['async.callback']".
    throw :async
  end

  def self.channel
    @channel ||= AMQP::Channel.new(AMQP.connection)
  end

  def self.replies_queue
    @replies_queue ||= channel.queue("reply", :exclusive => true, :auto_delete => true)
  end
end
Run Code Online (Sandbox Code Playgroud)

就客户而言,在返回响应之前,结果与同步调用中的Web应用程序阻塞无法区分; 但现在您的Web应用程序可以同时处理许多此类请求.

警告!

Async Rails是一种先进的技术; 你需要知道你在做什么.Rails的某些部分并不需要突然拆除它们的调用堆栈.该throw会绕过任何机架中间件不知道赶上并重新抛出它(这里是一个比较旧的部分解决方案).ActiveSupport的开发模式类重新加载将在throw不等待响应的情况下重新加载应用程序的类,如果您的回调引用了已经重新加载的类,则可能导致非常混乱的破坏.您还需要很好地询问ActionController以允许并发请求.

请求/响应

您还需要匹配请求和响应.既然这样,如果请求1到达,然后请求2到达之前请求1产生响应,那么它的未定义该请求将接收响应1(在队列中的消息分布订阅的队列中的消费者之间循环).

你可以通过检查CORRELATION_ID做到这一点(这你必须明确地设置,顺便说一句 - 的RabbitMQ不会为你做!),然后重新入队的消息,如果这不是你在等待响应.我的方法是创建一个持久的Publisher对象,该对象将跟踪打开的请求,侦听所有响应,并根据correlation_id查找要调用的相应回调.

替代方案:只使用HTTP

你真的解决两个不同的(和棘手!)问题在这里:说服轨道/薄异步处理请求,以及针对AMQP的发布-订阅模型上实现的请求-响应语义.鉴于你说这是为了在两个Rails应用程序之间进行调用,为什么不使用HTTP,它已经具有你需要的请求 - 响应语义?这样你只需要解决第一个问题.如果使用非阻塞HTTP客户端库(例如em-http-request),仍可以获得并发请求处理.