postgres LISTEN/NOTIFY rails

Tyl*_*itt 26 postgresql asynchronous ruby-on-rails push-notification

Ryan Bates在本集讨论推送通知时提到了Postgres的LISTEN/NOTIFY功能,但我还没有找到任何关于如何在我的rails应用程序中实现LISTEN/NOTIFY的提示.

这里是适配器内部wait_for_notify函数的文档pg,但我无法弄清楚究竟是什么/为什么设计的.

我们是否需要直接点击适配器的connection变量pg

Pre*_*ids 51

您正在使用该wait_for_notify方法查找正确的位置,但由于ActiveRecord显然不提供使用它的API,因此您需要获取基础PG :: Connection对象(或者其中一个,如果您是运行多线程设置),ActiveRecord正在使用它与Postgres交谈.

一旦你有了连接,只需执行LISTEN你需要的任何语句,然后传递一个块(和一个可选的超时周期)wait_for_notify.请注意,这将阻止当前线程,并独占Postgres连接,直到达到超时或NOTIFY发生(例如,您不希望在Web请求中执行此操作).当另一个进程NOTIFY在你正在侦听的其中一个通道上发出时,将使用三个参数调用该块 - 通知的通道,触发的Postgres后端的pid NOTIFY以及随附的有效负载NOTIFY(如果有的话) ).

我有很长一段时间没有使用ActiveRecord,所以可能有一种更简洁的方法来做到这一点,但这似乎在4.0.0.beta1中正常工作:

# Be sure to check out a connection, so we stay thread-safe.
ActiveRecord::Base.connection_pool.with_connection do |connection|
  # connection is the ActiveRecord::ConnectionAdapters::PostgreSQLAdapter object
  conn = connection.instance_variable_get(:@connection)
  # conn is the underlying PG::Connection object, and exposes #wait_for_notify

  begin
    conn.async_exec "LISTEN channel1"
    conn.async_exec "LISTEN channel2"

    # This will block until a NOTIFY is issued on one of these two channels.
    conn.wait_for_notify do |channel, pid, payload|
      puts "Received a NOTIFY on channel #{channel}"
      puts "from PG backend #{pid}"
      puts "saying #{payload}"
    end

    # Note that you'll need to call wait_for_notify again if you want to pick
    # up further notifications. This time, bail out if we don't get a
    # notification within half a second.
    conn.wait_for_notify(0.5) do |channel, pid, payload|
      puts "Received a second NOTIFY on channel #{channel}"
      puts "from PG backend #{pid}"
      puts "saying #{payload}"
    end
  ensure
    # Don't want the connection to still be listening once we return
    # it to the pool - could result in weird behavior for the next
    # thread to check it out.
    conn.async_exec "UNLISTEN *"
  end
end
Run Code Online (Sandbox Code Playgroud)

有关更一般用法的示例,请参阅Sequel的实现.

编辑添加:这是对正在发生的事情的另一种描述.这可能不是幕后的确切实现,但它似乎足以描述这种行为.

Postgres会保留每个连接的通知列表.当您使用连接执行时LISTEN channel_name,您告诉Postgres应该将该通道上的任何通知推送到此连接的列表(多个连接可以侦听同一个通道,因此单个通知最终会被推送到许多列表).一个连接可以LISTEN同时连接到多个通道,并且任何通道的通知都将被推送到同一个列表.

什么wait_for_notify是从连接列表中弹出最早的通知并将其信息传递给块 - 或者,如果列表为空,则休眠直到通知可用并为此做同样的事情(或直到达到超时,其中它刚刚返回nil).由于wait_for_notify只处理单个通知,因此如果要处理多个通知,则必须重复调用它.

当您UNLISTEN channel_name或者UNLISTEN *Postgres将停止将这些通知推送到您的连接列表中时,但已经被推送到该列表的那些将保留在那里,并且wait_for_notify仍将在下次调用时返回它们.这可能会导致一个问题,即在另一个线程检出该连接之后wait_for_notify,在之前但之前累积的通知UNLISTEN仍然存在.在这种情况下,UNLISTEN您可能希望wait_for_notify使用短暂超时调用,直到它返回nil.但是,除非你大量使用LISTENNOTIFY用于许多不同的目的,否则它可能不值得担心.

我在上面添加了一个更好的链接到Sequel的实现,我建议看一下.这很简单.