使用Ruby的JS风格的异步/非阻塞回调执行,没有像线程这样的重型机器?

And*_*aus 5 ruby

我是一名前端开发人员,对Ruby很熟悉.我只知道如何以同步/顺序方式执行Ruby,而在JS中,我习惯于异步/非阻塞回调.

这是示例Ruby代码:

results = []
rounds = 5

callback = ->(item) {
  # This imitates that the callback may take time to complete
  sleep rand(1..5)

  results.push item

  if results.size == rounds
    puts "All #{rounds} requests have completed! Here they are:", *results
  end
}

1.upto(rounds) { |item| callback.call(item) }

puts "Hello"
Run Code Online (Sandbox Code Playgroud)

目标是在不阻止主脚本执行的情况下运行回调.换句话说,我希望"Hello"行显示在"All 5 requests ..."行上方的输出中.此外,回调应该同时运行,以便最快完成的回调使其首先进入结果数组.

使用JavaScript,我只需将回调调用包装setTimeout为零延迟:

setTimeout( function() { callback(item); }, 0);
Run Code Online (Sandbox Code Playgroud)

这种JS方法不实现真正的多线程/并发/并行执行.在引擎盖下,回调将在一个线程中顺序运行,或者在低级别上交错运行.

但是在实际级别上,它将显示为并发执行:生成的数组将按照与每个回调所花费的时间量相对应的顺序进行填充,即生成的数组将按每个回调完成所花费的时间排序.

请注意,我只想要异步功能setTimeout().我不需要内置的睡眠功能setTimeout()(不要与sleep回调示例中用于模仿耗时的操作混淆).

我试图探讨如何使用Ruby进行JS风格的异步方法,并给出了使用建议:

  1. 多线程.这可能是Ruby的方法,但它需要大量的脚手架:

    1. 手动为线程定义数组.
    2. 手动定义互斥锁.
    3. 为每个回调启动一个新线程,将其添加到数组中.
    4. 将互斥锁传递给每个回调.
    5. 在回调中使用互斥锁进行线程同步.
    6. 确保在程序完成之前完成所有线程.

    与JavaScript相比setTimeout(),这太过分了.因为我不需要真正的并行执行,所以每次我想要异步执行proc时,我都不想构建那么多的脚手架.

  2. 一个复杂的Ruby库,如Celluloid和Event Machine.他们看起来需要花费数周的时间来学习它们.

  3. 这样的自定义解决方案(作者,apeiros @ freenode,声称它非常接近setTimeout的内幕).它几乎不需要构建脚手架,也不涉及线程.但它似乎按照它们执行的顺序同步运行回调.

我一直认为Ruby是一种最接近我理想的编程语言,而JS则是一个穷人的编程语言.并且有点让我不鼓励Ruby在没有涉及重型机械的情况下无法做一件与JS无关的事情.

所以问题是:使用Ruby进行异步/非阻塞回调的最简单,最直观的方法是什么,而不涉及线程或复杂库等复杂的机制?

PS如果在赏金期间没有令人满意的答案,我将通过apeiros挖掘#3,并可能使其成为公认的答案.

And*_*aus 1

好吧,在摆弄线程并研究 apeiros 和 asQuirreL 的贡献之后,我想出了一个适合我的解决方案。

我将首先展示示例用法,最后展示源代码。

示例1:简单的非阻塞执行

首先,我试图模仿一个JS示例:

setTimeout( function() {
  console.log("world");
}, 0);

console.log("hello");

// 'Will print "hello" first, then "world"'.
Run Code Online (Sandbox Code Playgroud)

以下是我如何使用我的小型Rub​​y库做到这一点:

# You wrap all your code into this...
Branch.new do

  # ...and you gain access to the `branch` method that accepts a block.
  # This block runs non-blockingly, just like in JS `setTimeout(callback, 0)`.
  branch { puts "world!" }

  print "Hello, "

end

# Will print "Hello, world!"
Run Code Online (Sandbox Code Playgroud)

请注意,您不必关心创建线程、等待它们完成。唯一需要的脚手架是Branch.new { ... }包装器。

示例 2:使用互斥体同步线程

现在我们假设我们正在处理一些在线程之间共享的输入和输出。

我试图用 Ruby 重现JS代码:

var
  results = [],
  rounds = 5;

for (var i = 1; i <= rounds; i++) {

  console.log("Starting thread #" + i + ".");

  // "Creating local scope"
  (function(local_i) {
    setTimeout( function() {

      // "Assuming there's a time-consuming operation here."

      results.push(local_i);
      console.log("Thread #" + local_i + " has finished.");

      if (results.length === rounds)
        console.log("All " + rounds + " threads have completed! Bye!");

    }, 0);
  })(i);
}

console.log("All threads started!");
Run Code Online (Sandbox Code Playgroud)

此代码产生以下输出:

Starting thread #1.
Starting thread #2.
Starting thread #3.
Starting thread #4.
Starting thread #5.
All threads started!
Thread #5 has finished.
Thread #4 has finished.
Thread #3 has finished.
Thread #2 has finished.
Thread #1 has finished.
All 5 threads have completed! Bye!
Run Code Online (Sandbox Code Playgroud)

请注意,回调以相反的顺序完成。

我们还假设对results数组进行操作可能会产生竞争条件。在 JS 中这从来都不是问题,但在多线程 Ruby 中这必须通过互斥体来解决。

Ruby相当于上面的内容:

Branch.new 1 do

  # Setting up an array to be filled with that many values.
  results = []
  rounds = 5

  # Running `branch` N times:
  1.upto(rounds) do |item|

    puts "Starting thread ##{item}."

    # The block passed to `branch` accepts a hash with mutexes 
    # that you can use to synchronize threads.
    branch do |mutexes|

      # This imitates that the callback may take time to complete.
      # Threads will finish in reverse order.
      sleep (6.0 - item) / 10

      # When you need a mutex, you simply request one from the hash.
      # For each unique key, a new mutex will be created lazily.
      mutexes[:array_and_output].synchronize do
        puts "Thread ##{item} has finished!"
        results.push item

        if results.size == rounds
          puts "All #{rounds} threads have completed! Bye!"
        end
      end
    end
  end

  puts "All threads started."
end

puts "All threads finished!"
Run Code Online (Sandbox Code Playgroud)

请注意,您不必关心创建线程、等待它们完成、创建互斥体并将它们传递到块中。

示例3:延迟块的执行

如果你需要延迟功能的话setTimeout,可以这样做。

JS

setTimeout(function(){ console.log('Foo'); }, 2000);
Run Code Online (Sandbox Code Playgroud)

红宝石

branch(2) { puts 'Foo' }
Run Code Online (Sandbox Code Playgroud)

示例 4:等待所有线程完成

对于 JS,没有简单的方法让脚本等待所有线程完成。为此,您需要一个等待/延迟库。

但在 Ruby 中这是可能的,而且 Branch 使它变得更加简单。如果您在包装器之后编写代码Branch.new{},它将在包装器内的所有分支完成后执行。您不需要手动确保所有线程都已完成,Branch 会为您做到这一点。

Branch.new do
  branch { sleep 10 }
  branch { sleep 5 }

  # This will be printed immediately
  puts "All threads started!"
end

# This will be printed after 10 seconds (the duration of the slowest branch).
puts "All threads finished!"
Run Code Online (Sandbox Code Playgroud)

顺序Branch.new{}包装器将按顺序执行。

来源

# (c) lolmaus (Andrey Mikhaylov), 2014
# MIT license http://choosealicense.com/licenses/mit/

class Branch
  def initialize(mutexes = 0, &block)
    @threads = []
    @mutexes = Hash.new { |hash, key| hash[key] = Mutex.new }

    # Executing the passed block within the context
    # of this class' instance.
    instance_eval &block

    # Waiting for all threads to finish
    @threads.each { |thr| thr.join }
  end

  # This method will be available within a block
  # passed to `Branch.new`.
  def branch(delay = false, &block)

    # Starting a new thread 
    @threads << Thread.new do

      # Implementing the timeout functionality
      sleep delay if delay.is_a? Numeric

      # Executing the block passed to `branch`,
      # providing mutexes into the block.
      block.call @mutexes
    end
  end
end
Run Code Online (Sandbox Code Playgroud)