红宝石块内的赛璐珞异步不起作用

Vir*_*ren 14 ruby celluloid ruby-2.2

尝试在我的工作示例中实现Celluloid 异步似乎表现出奇怪的行为.

这里我的代码看起来

 class Indefinite
    include Celluloid

      def run!
         loop do 
           [1].each do |i|
             async.on_background
           end
         end
      end 


       def on_background
         puts "Running in background" 
       end
   end

   Indefinite.new.run!
Run Code Online (Sandbox Code Playgroud)

但是当我运行上面的代码时,我从未看到过" 在后台运行 "

但是,如果我睡觉,代码似乎工作.

class Indefinite
   include Celluloid

    def run! 
      loop do 
        [1].each do |i|
          async.on_background
        end
        sleep 0.5
      end 
    end


   def on_background
     puts "Running in background" 
   end
 end

 Indefinite.new.run!
Run Code Online (Sandbox Code Playgroud)

任何的想法?为什么在上述两个场景中存在这样的差异.

谢谢.

dig*_*ist 17

你的主循环主宰了actor /应用程序的线程.

您的所有程序都在生成后台进程,但从不运行它们.你需要sleep在循环中纯粹允许后台线程得到关注.

拥有无条件循环产生无限的后台进程通常不是一个好主意,就像你在这里一样.应该有一个延迟或条件语句放在那里......否则你只有一个无限循环产生永远不会被调用的东西.

想想这样:如果你把puts "looping"它放在你的循环中,而你却看不到Running in the background......你会看到looping一遍又一遍.


方法#1:使用everyafter阻止.

解决这个问题的最好方法是不要在sleep内部使用loop,而是使用afterevery阻止,如下所示:

every(0.1) {
    on_background
}
Run Code Online (Sandbox Code Playgroud)

或者最重要的是,如果要在再次运行之前确保进程完全运行,请使用after:

def run_method
    @running ||= false
    unless @running
        @running = true
        on_background
        @running = false
    end
    after(0.1) { run_method }
 end
Run Code Online (Sandbox Code Playgroud)

使用a loop不是一个好主意,async除非有某种流量控制完成,或阻塞过程,如@server.accept...否则它只会毫无理由地拉出100%的CPU核心.

顺便说一句,你也可以使用now_and_every,以及now_and_after太...这将运行块向右走,那么你希望的时间后再次运行.

使用every在这个要点中显示:


在我看来,理想的情况是:

这是一个粗略但可立即使用的示例:


require 'celluloid/current'

class Indefinite
  include Celluloid

  INTERVAL = 0.5
  ONE_AT_A_TIME = true

  def self.run!
    puts "000a Instantiating."
    indefinite = new
    indefinite.run
    puts "000b Running forever:"
    sleep
  end

  def initialize
    puts "001a Initializing."
    @mutex = Mutex.new if ONE_AT_A_TIME
    @running = false
    puts "001b Interval: #{INTERVAL}"
  end

  def run
    puts "002a Running."
    unless ONE_AT_A_TIME && @running
      if ONE_AT_A_TIME
        @mutex.synchronize {
          puts "002b Inside lock."
          @running = true
          on_background
          @running = false
        }
      else
        puts "002b Without lock."
        on_background
      end
    end
    puts "002c Setting new timer."
    after(INTERVAL) { run }
  end


  def on_background
    if ONE_AT_A_TIME
      puts "003 Running background processor in foreground."
    else
      puts "003 Running in background"
    end
  end
end

Indefinite.run!
puts "004 End of application."
Run Code Online (Sandbox Code Playgroud)

这将是它的输出,如果ONE_AT_A_TIMEtrue:

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
000b Running forever:
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
Run Code Online (Sandbox Code Playgroud)

这将是它的输出,如果ONE_AT_A_TIMEfalse:

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
000b Running forever:
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
Run Code Online (Sandbox Code Playgroud)

您需要更"正常"而不是"线程化"以正确发布任务并保留范围和状态,而不是在线程/角色之间发出命令......这就是everyafter块提供的.除此之外,无论哪种方式,这都是很好的做法,即使你没有Global Interpreter Lock处理,因为在你的例子中,你似乎并没有处理阻塞过程.如果你有一个阻塞过程,那么无论如何都要有一个无限循环.但是,由于你只是在处理之前最终会产生无数的后台任务,你需要使用sleep类似你的问题开始,或者完全使用不同的策略,并使用every,after这是如何Celluloid 本身鼓励您在处理任何类型插座上的数据时进行操作.


方法#2:使用递归方法调用.

这恰好出现在Google Group中.下面的示例代码实际上允许执行其他任务,即使它是无限循环.

这种方法不太理想,因为它可能会产生更多的开销,产生一系列纤维.

def work
    # ...
    async.work
end
Run Code Online (Sandbox Code Playgroud)

问题#2:ThreadFiber行为有关.

第二个问题是为什么以下方法有效: loop { Thread.new { puts "Hello" } }

这产生了无数个进程线程,由RVM直接管理.即使有一个Global Interpreter LockRVM你正在使用...只意味着没有green threads使用这些操作系统本身,而不是......这些都是由进程本身处理提供.进程的CPU调度程序Thread毫不犹豫地自行运行.在示例的情况下,Thread运行非常快,然后死亡.

async任务相比,Fiber使用a.所以在默认情况下发生了什么:

  1. 流程开始.
  2. 演员实例化.
  3. 方法调用调用循环.
  4. 循环调用async方法.
  5. async 方法将任务添加到邮箱.
  6. 不调用邮箱,循环继续.
  7. 另一项async任务添加到邮箱中.
  8. 这无限继续.

上面是因为循环方法本身是一个Fiber调用,它永远不会被挂起(除非sleep被调用!)因此添加到邮箱的附加任务永远不会调用新的Fiber.A的Fiber行为与a不同Thread.这是讨论差异的一篇很好的参考资料:


问题3:CelluloidCelluloid::ZMQ行为.

第三个问题是为什么include Celluloid表现不同于Celluloid::ZMQ......

那是因为Celluloid::ZMQ使用基于反应堆的事件邮箱,而Celluloid使用基于条件变量的邮箱.

阅读有关流水线和执行模式的更多信息:

这就是两个例子之间的区别.如果您对这些邮箱的行为方式有其他疑问,请随时在Google网上论坛上发帖...您面临的主要动态是GILFibervs. Threadvs. Reactor行为互动的独特性.

您可以在此处阅读有关reactor-pattern的更多信息:

并看到Celluloid::ZMQ这里使用的具体反应器:

因此,在事件邮箱方案中发生的事情是,当sleep命中时,这是阻塞调用,这会导致反应器移动到邮箱中的下一个任务.

但是,这也是你的情况所特有的,所使用的特定反应器Celluloid::ZMQ是使用永恒的C库...特别是0MQ库.该反应堆的外部应用程序,其行为比不同Celluloid::IOCelluloid本身,这也是为什么比你所期望的行为发生不同.

多核支持替代方案

如果维护状态和范围对您来说并不重要,如果您使用jRubyRubinius不限于一个操作系统线程,而不是使用MRI具有该操作系统线程,则Global Interpreter Lock可以实例化多个actor并async同时在actor之间发出调用.

但我的拙见是,使用非常高频率的计时器,例如0.0010.1在我的例子中,你会得到更好的服务,这对于所有意图和目的而言似乎是瞬间的,但也允许演员线程有足够的时间来切换光纤并运行邮箱中的其他任务.

  • 令人敬畏的怪异答案. (3认同)
  • @digitalextremist遗憾的是它没有.请看我的答案,我认为它可以解决. (2认同)