Redis Pub / Sub导致Web套接字连接挂起

aar*_*nk6 0 ruby sinatra websocket redis

我正在构建一个通过Web套接字连接到服务器的Web应用程序。服务器组件是基于一个小Ruby应用程序sinatraredisfaye-websocket。服务器正在Phusion Passenger上运行。一个单独的updater守护进程不断从各种来源获取更新,并将其发布到redis(使用redisgem和Redis::publish)。

为了将更新推送给客户端,我在Sinatra应用程序中尝试了以下操作:

get '/' do
  if Faye::WebSocket.websocket?(request.env)
    store = Redis.new
    ws = Faye::WebSocket.new(request.env)

    ws.on(:open) do |event|
      store.incr('connection_count')
      puts 'Client connected (connection count: %s)' % store.get('connection_count')
    end

    ws.on(:close) do |event|
      store.decr('connection_count')
      puts 'Client disconnected (connection count: %s)' % store.get('connection_count')
    end

    ws.rack_response

    store.subscribe(:updates) do |on|
      on.message do |ch, payload|
        puts "Got update"
        ws.send(payload) if payload
      end
    end
  end
end
Run Code Online (Sandbox Code Playgroud)

这仅部分起作用。客户端可以成功连接,也可以接收更新,但store.incrand store.decr调用不起作用。另外,连接似乎没有正确关闭-当我启动多个客户端时,我注意到连接堆积,乘客服务器最终停止工作。

日志输出:

devserver_1 | App 614 stdout: Got update
devserver_1 | App 614 stdout: Got update
devserver_1 | App 614 stdout: Got update
Run Code Online (Sandbox Code Playgroud)

当我注释以下块时,跟踪连接突然起作用:

store.subscribe(:updates) do |on|
  on.message do |ch, payload|
    puts "Got update"
    ws.send(payload) if payload
  end
end
Run Code Online (Sandbox Code Playgroud)

日志输出:

devserver_1 | App 1028 stdout: Client connected (connection count: 1)
devserver_1 | App 1039 stdout: Client connected (connection count: 2)
devserver_1 | App 1039 stdout: Client disconnected (connection count: 1)
devserver_1 | App 1028 stdout: Client disconnected (connection count: 0)
Run Code Online (Sandbox Code Playgroud)

因此,使用Redis::subscribe似乎会以某种方式干扰Web套接字连接。

我该如何解决?

  • Phusion Passenger版本4.0.58
  • 红宝石2.2.0p0(2014-12-25修订版49005)[x86_64-linux-gnu]
  • 辛纳特拉(1.4.6)
  • faye-websocket(0.9.2)

Mar*_*cny 5

我认为这里的问题是Faye使用EventMachine,这意味着您的线程上有一个反应器来处理事件,并调用回调ws.on(:open)ws.on(:close)

现在当你打

store.subscribe(:updates) do |on|
  on.message do |ch, payload|
    puts "Got update"
    ws.send(payload) if payload
  end
end
Run Code Online (Sandbox Code Playgroud)

这是一个阻塞操作-它完全阻塞了当前线程。如果您当前的线程被阻止,反应堆将无法监听事件,然后调用您的回调。

一种解决方案是store.subscribe在不同的线程上运行您的线程,因此阻塞该线程无关紧要。

但是我认为更好的解决方案是使用Redis库的非阻塞版本:

从文档中:

redis = EM::Hiredis.connect
pubsub = redis.pubsub

pubsub.subscribe(:updates).callback do
    puts "Got update"
    ws.send(payload) if payload
end
Run Code Online (Sandbox Code Playgroud)

这两个(Redis + Faye)都应在EventMachine反应器循环中注册,以便将事件分派给这两个事件。